kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2946; DeleteTopic - protocol and server side implementation
Date Fri, 12 Aug 2016 21:26:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 208ecae23 -> 539633ba0


KAFKA-2946; DeleteTopic - protocol and server side implementation

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <me@ewencp.org>, Jun Rao <junrao@gmail.com>

Closes #1616 from granthenke/delete-wire-new


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/539633ba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/539633ba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/539633ba

Branch: refs/heads/trunk
Commit: 539633ba0eba452960a4d2a3bf07abd79020b329
Parents: 208ecae
Author: Grant Henke <granthenke@gmail.com>
Authored: Fri Aug 12 14:26:46 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Aug 12 14:26:46 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  |  19 ++
 .../kafka/common/requests/AbstractRequest.java  |   2 +
 .../common/requests/CreateTopicsResponse.java   |   1 +
 .../common/requests/DeleteTopicsRequest.java    |  90 +++++++
 .../common/requests/DeleteTopicsResponse.java   |  88 +++++++
 .../common/requests/RequestResponseTest.java    |  16 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |   2 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  28 +-
 .../main/scala/kafka/server/AdminManager.scala  |  44 ++++
 .../kafka/server/DelayedDeleteTopics.scala      |  77 ++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  51 +++-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 259 ++++++++++---------
 .../unit/kafka/admin/DeleteTopicTest.scala      |  11 +-
 .../unit/kafka/server/BaseRequestTest.scala     |   9 +-
 .../kafka/server/DeleteTopicsRequestTest.scala  | 121 +++++++++
 16 files changed, 668 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index bd00b97..6178b80 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -39,7 +39,8 @@ public enum ApiKeys {
     LIST_GROUPS(16, "ListGroups"),
     SASL_HANDSHAKE(17, "SaslHandshake"),
     API_VERSIONS(18, "ApiVersions"),
-    CREATE_TOPICS(19, "CreateTopics");
+    CREATE_TOPICS(19, "CreateTopics"),
+    DELETE_TOPICS(20, "DeleteTopics");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 907f7ff..d27ec8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -814,6 +814,23 @@ public class Protocol {
     public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0};
     public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0};
 
+    /* DeleteTopic api */
+    public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(
+        new Field("topics",
+            new ArrayOf(STRING),
+            "An array of topics to be deleted."),
+        new Field("timeout",
+            INT32,
+            "The time in ms to wait for a topic to be completely deleted on the controller node. Values <= 0 will trigger topic deletion and return immediately"));
+
+    public static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema(
+        new Field("topic_error_codes",
+            new ArrayOf(TOPIC_ERROR_CODE),
+            "An array of per topic error codes."));
+
+    public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
+    public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -844,6 +861,7 @@ public class Protocol {
         REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
         REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
         REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
+        REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -865,6 +883,7 @@ public class Protocol {
         RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
         RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
         RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
+        RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 6a91825..e6febe5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -78,6 +78,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return ApiVersionsRequest.parse(buffer, versionId);
             case CREATE_TOPICS:
                 return CreateTopicsRequest.parse(buffer, versionId);
+            case DELETE_TOPICS:
+                return DeleteTopicsRequest.parse(buffer, versionId);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 1e6d11e..da8c3ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -39,6 +39,7 @@ public class CreateTopicsResponse extends AbstractRequestResponse {
     /**
      * Possible error codes:
      *
+     * REQUEST_TIMED_OUT(7)
      * INVALID_TOPIC_EXCEPTION(17)
      * CLUSTER_AUTHORIZATION_FAILED(31)
      * TOPIC_ALREADY_EXISTS(36)

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
new file mode 100644
index 0000000..f78c428
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteTopicsRequest extends AbstractRequest {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DELETE_TOPICS.id);
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+
+    private final Set<String> topics;
+    private final Integer timeout;
+
+    public DeleteTopicsRequest(Set<String> topics, Integer timeout) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+
+        this.topics = topics;
+        this.timeout = timeout;
+    }
+
+    public DeleteTopicsRequest(Struct struct) {
+        super(struct);
+        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
+        Set<String> topics = new HashSet<>(topicsArray.length);
+        for (Object topic : topicsArray)
+            topics.add((String) topic);
+
+        this.topics = topics;
+        this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<String, Errors> topicErrors = new HashMap<>();
+        for (String topic : topics)
+            topicErrors.put(topic, Errors.forException(e));
+
+        switch (versionId) {
+            case 0:
+                return new DeleteTopicsResponse(topicErrors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)));
+        }
+    }
+
+    public Set<String> topics() {
+        return topics;
+    }
+
+    public Integer timeout() {
+        return this.timeout;
+    }
+
+    public static DeleteTopicsRequest parse(ByteBuffer buffer, int versionId) {
+        return new DeleteTopicsRequest(ProtoUtils.parseRequest(ApiKeys.DELETE_TOPICS.id, versionId, buffer));
+    }
+
+    public static DeleteTopicsRequest parse(ByteBuffer buffer) {
+        return new DeleteTopicsRequest(CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
new file mode 100644
index 0000000..e474feb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteTopicsResponse extends AbstractRequestResponse {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DELETE_TOPICS.id);
+    private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error codes:
+     *
+     * REQUEST_TIMED_OUT(7)
+     * INVALID_TOPIC_EXCEPTION(17)
+     * TOPIC_AUTHORIZATION_FAILED(29)
+     * NOT_CONTROLLER(41)
+     */
+    private final Map<String, Errors> errors;
+
+    public DeleteTopicsResponse(Map<String, Errors> errors) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
+            Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
+            topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
+            topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, topicError.getValue().code());
+            topicErrorCodeStructs.add(topicErrorCodeStruct);
+        }
+        struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray());
+
+        this.errors = errors;
+    }
+
+    public DeleteTopicsResponse(Struct struct) {
+        super(struct);
+
+        Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
+        Map<String, Errors> errors = new HashMap<>();
+        for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
+            Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
+            String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
+            short errorCode = topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME);
+            errors.put(topic, Errors.forCode(errorCode));
+        }
+
+        this.errors = errors;
+    }
+
+    public Map<String, Errors> errors() {
+        return errors;
+    }
+
+    public static DeleteTopicsResponse parse(ByteBuffer buffer) {
+        return new DeleteTopicsResponse(CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static DeleteTopicsResponse parse(ByteBuffer buffer, int version) {
+        return new DeleteTopicsResponse(ProtoUtils.responseSchema(ApiKeys.DELETE_TOPICS.id, version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index afeece7..be7f974 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -102,7 +102,10 @@ public class RequestResponseTest {
                 createApiVersionResponse(),
                 createCreateTopicRequest(),
                 createCreateTopicRequest().getErrorResponse(0, new UnknownServerException()),
-                createCreateTopicResponse()
+                createCreateTopicResponse(),
+                createDeleteTopicsRequest(),
+                createDeleteTopicsRequest().getErrorResponse(0, new UnknownServerException()),
+                createDeleteTopicsResponse()
         );
 
         for (AbstractRequestResponse req : requestResponseList)
@@ -479,4 +482,15 @@ public class RequestResponseTest {
         errors.put("t2", Errors.LEADER_NOT_AVAILABLE);
         return new CreateTopicsResponse(errors);
     }
+
+    private AbstractRequest createDeleteTopicsRequest() {
+        return new DeleteTopicsRequest(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000);
+    }
+
+    private AbstractRequestResponse createDeleteTopicsResponse() {
+        Map<String, Errors> errors = new HashMap<>();
+        errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
+        errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED);
+        return new DeleteTopicsResponse(errors);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 080f809..8548ebc 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -29,7 +29,7 @@ object AclCommand {
 
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
-    Topic -> Set(Read, Write, Describe, All),
+    Topic -> Set(Read, Write, Describe, All, Delete),
     Group -> Set(Read, All),
     Cluster -> Set(Create, ClusterAction, All)
   )

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index aa6ebe2..d8702df 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -308,14 +308,18 @@ object AdminUtils extends Logging {
   }
 
   def deleteTopic(zkUtils: ZkUtils, topic: String) {
-    try {
-      zkUtils.createPersistentPath(getDeleteTopicPath(topic))
-    } catch {
-      case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
-        "topic %s is already marked for deletion".format(topic))
-      case e2: Throwable => throw new AdminOperationException(e2.toString)
+      if (topicExists(zkUtils, topic)) {
+        try {
+          zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+        } catch {
+          case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+            "topic %s is already marked for deletion".format(topic))
+          case e2: Throwable => throw new AdminOperationException(e2)
+        }
+      } else {
+        throw new InvalidTopicException("topic %s to delete does not exist".format(topic))
+      }
     }
-  }
 
   def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
     zkUtils.getConsumersInGroup(group).nonEmpty
@@ -487,7 +491,7 @@ object AdminUtils extends Logging {
    *
    */
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
-    if(!topicExists(zkUtils, topic))
+    if (!topicExists(zkUtils, topic))
       throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
     // remove the topic overrides
     LogConfig.validate(configs)
@@ -526,7 +530,7 @@ object AdminUtils extends Logging {
   def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
     val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
     val props = new Properties()
-    if(str != null) {
+    if (str != null) {
       Json.parseFull(str) match {
         case None => // there are no config overrides
         case Some(mapAnon: Map[_, _]) =>
@@ -571,7 +575,7 @@ object AdminUtils extends Logging {
                                        zkUtils: ZkUtils,
                                        cachedBrokerInfo: mutable.HashMap[Int, Broker],
                                        protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = {
-    if(zkUtils.pathExists(getTopicPath(topic))) {
+    if (zkUtils.pathExists(getTopicPath(topic))) {
       val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
       val partitionMetadata = sortedPartitions.map { partitionMap =>
@@ -600,10 +604,10 @@ object AdminUtils extends Logging {
           } catch {
             case e: Throwable => throw new ReplicaNotAvailableException(e)
           }
-          if(replicaInfo.size < replicas.size)
+          if (replicaInfo.size < replicas.size)
             throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
               replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-          if(isrInfo.size < inSyncReplicas.size)
+          if (isrInfo.size < inSyncReplicas.size)
             throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
               inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
           new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index fc3a7f0..9c5fe73 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.util.Properties
 
 import kafka.admin.AdminUtils
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
@@ -111,4 +112,47 @@ class AdminManager(val config: KafkaConfig,
       topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
     }
   }
+
+  /**
+    * Delete topics and wait until the topics have been completely deleted.
+    * The callback function will be triggered either when timeout, error or the topics are deleted.
+    */
+  def deleteTopics(timeout: Int,
+                   topics: Set[String],
+                   responseCallback: Map[String, Errors] => Unit) {
+
+    // 1. map over topics calling the asynchronous delete
+    val metadata = topics.map { topic =>
+        try {
+          AdminUtils.deleteTopic(zkUtils, topic)
+          DeleteTopicMetadata(topic, Errors.NONE)
+        } catch {
+          case e: TopicAlreadyMarkedForDeletionException =>
+            // swallow the exception, and still track deletion allowing multiple calls to wait for deletion
+            DeleteTopicMetadata(topic, Errors.NONE)
+          case e: Throwable =>
+            error(s"Error processing delete topic request for topic $topic", e)
+            DeleteTopicMetadata(topic, Errors.forException(e))
+        }
+    }
+
+    // 2. if timeout <= 0 or no topics can proceed return immediately
+    if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
+      val results = metadata.map { deleteTopicMetadata =>
+        // ignore topics that already have errors
+        if (deleteTopicMetadata.error == Errors.NONE) {
+          (deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
+        } else {
+          (deleteTopicMetadata.topic, deleteTopicMetadata.error)
+        }
+      }.toMap
+      responseCallback(results)
+    } else {
+      // 3. else pass the topics and errors to the delayed operation and set the keys
+      val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
+      val delayedDeleteKeys = topics.map(new TopicKey(_)).toSeq
+      // try to complete the request immediately, otherwise put it into the purgatory
+      topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala b/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
new file mode 100644
index 0000000..95d6f50
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala
@@ -0,0 +1,77 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import org.apache.kafka.common.protocol.Errors
+
+import scala.collection._
+
+/**
+  * The delete metadata maintained by the delayed delete operation
+  */
+case class DeleteTopicMetadata(topic: String, error: Errors)
+
+/**
+  * A delayed delete topics operation that can be created by the admin manager and watched
+  * in the topic purgatory
+  */
+class DelayedDeleteTopics(delayMs: Long,
+                          deleteMetadata: Seq[DeleteTopicMetadata],
+                          adminManager: AdminManager,
+                          responseCallback: Map[String, Errors] => Unit)
+  extends DelayedOperation(delayMs) {
+
+  /**
+    * The operation can be completed if all of the topics not in error have been removed
+    */
+  override def tryComplete() : Boolean = {
+    trace(s"Trying to complete operation for $deleteMetadata")
+
+    // Ignore topics that already have errors
+    val existingTopics = deleteMetadata.count { metadata => metadata.error == Errors.NONE && topicExists(metadata.topic) }
+
+    if (existingTopics == 0) {
+      trace("All topics have been deleted or have errors, completing the delayed operation")
+      forceComplete()
+    } else {
+      trace(s"$existingTopics topics still exist, not completing the delayed operation")
+      false
+    }
+  }
+
+  /**
+    * Check for partitions that still exist, update their error code and call the responseCallback
+    */
+  override def onComplete() {
+    trace(s"Completing operation for $deleteMetadata")
+    val results = deleteMetadata.map { metadata =>
+      // ignore topics that already have errors
+      if (metadata.error == Errors.NONE && topicExists(metadata.topic))
+        (metadata.topic, Errors.REQUEST_TIMED_OUT)
+      else
+        (metadata.topic, metadata.error)
+    }.toMap
+    responseCallback(results)
+  }
+
+  override def onExpiration(): Unit = { }
+
+  private def topicExists(topic: String): Boolean = {
+    adminManager.metadataCache.contains(topic)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c85de0..6d38f85 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -33,19 +33,18 @@ import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
-import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write}
+import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 /**
  * Logic to handle the various Kafka requests
@@ -94,6 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -676,7 +676,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, Topic.isInternal(topic),
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
             java.util.Collections.emptyList())
         }
       }
@@ -1092,6 +1092,49 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleDeleteTopicsRequest(request: RequestChannel.Request) {
+    val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest]
+
+    val (authorizedTopics, unauthorizedTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
+      authorize(request.session, Delete, new Resource(auth.Topic, topic))
+    )
+
+    val unauthorizedResults = unauthorizedTopics.map ( topic =>
+      // Avoid leaking that the topic exists if the user is not authorized to describe the topic
+      if (authorize(request.session, Describe, new Resource(auth.Topic, topic))) {
+        (topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+      } else {
+        (topic, Errors.INVALID_TOPIC_EXCEPTION)
+      }
+    ).toMap
+
+    def sendResponseCallback(results: Map[String, Errors]): Unit = {
+      val completeResults = results ++ unauthorizedResults
+      val respHeader = new ResponseHeader(request.header.correlationId)
+      val responseBody = new DeleteTopicsResponse(completeResults.asJava)
+      trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+    }
+
+    if (!controller.isActive()) {
+      val results = deleteTopicRequest.topics.asScala.map { case topic =>
+        (topic, Errors.NOT_CONTROLLER)
+      }.toMap
+      sendResponseCallback(results)
+    } else {
+      // If no authorized topics return immediatly
+      if (authorizedTopics.isEmpty)
+        sendResponseCallback(Map())
+      else {
+        adminManager.deleteTopics(
+          deleteTopicRequest.timeout.toInt,
+          authorizedTopics,
+          sendResponseCallback
+        )
+      }
+    }
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f35151f..332b681 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -12,18 +12,14 @@
  */
 package kafka.api
 
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
 
-import kafka.cluster.EndPoint
 import kafka.common
 import kafka.common.TopicAndPartition
-import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -40,11 +36,15 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
 
-class AuthorizerIntegrationTest extends KafkaServerTestHarness {
+class AuthorizerIntegrationTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+  val brokerId: Integer = 0
+
   val topic = "topic"
   val createTopic = "topic-new"
+  val deleteTopic = "topic-delete"
   val part = 0
-  val brokerId: Integer = 0
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
@@ -52,6 +52,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
   val groupResource = new Resource(Group, group)
+  val deleteTopicResource = new Resource(Topic, deleteTopic)
 
   val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
@@ -59,83 +60,82 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
-  val numServers = 1
   val producerCount = 1
   val consumerCount = 2
   val producerConfig = new Properties
   val numRecords = 1
 
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
-  overridingProps.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-
-  val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
-
-  val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
-    Map(ApiKeys.METADATA.id -> classOf[requests.MetadataResponse],
-      ApiKeys.PRODUCE.id -> classOf[requests.ProduceResponse],
-      ApiKeys.FETCH.id -> classOf[requests.FetchResponse],
-      ApiKeys.LIST_OFFSETS.id -> classOf[requests.ListOffsetResponse],
-      ApiKeys.OFFSET_COMMIT.id -> classOf[requests.OffsetCommitResponse],
-      ApiKeys.OFFSET_FETCH.id -> classOf[requests.OffsetFetchResponse],
-      ApiKeys.GROUP_COORDINATOR.id -> classOf[requests.GroupCoordinatorResponse],
-      ApiKeys.UPDATE_METADATA_KEY.id -> classOf[requests.UpdateMetadataResponse],
-      ApiKeys.JOIN_GROUP.id -> classOf[JoinGroupResponse],
-      ApiKeys.SYNC_GROUP.id -> classOf[SyncGroupResponse],
-      ApiKeys.HEARTBEAT.id -> classOf[HeartbeatResponse],
-      ApiKeys.LEAVE_GROUP.id -> classOf[LeaveGroupResponse],
-      ApiKeys.LEADER_AND_ISR.id -> classOf[requests.LeaderAndIsrResponse],
-      ApiKeys.STOP_REPLICA.id -> classOf[requests.StopReplicaResponse],
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse],
-      ApiKeys.CREATE_TOPICS.id -> classOf[CreateTopicsResponse]
+  override def propertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+    properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  }
+
+  val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
+    Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
+      ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
+      ApiKeys.FETCH -> classOf[requests.FetchResponse],
+      ApiKeys.LIST_OFFSETS -> classOf[requests.ListOffsetResponse],
+      ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
+      ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
+      ApiKeys.GROUP_COORDINATOR -> classOf[requests.GroupCoordinatorResponse],
+      ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse],
+      ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse],
+      ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse],
+      ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse],
+      ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
+      ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
+      ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
+      ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
+      ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse]
   )
 
-  val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
-    ApiKeys.METADATA.id -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
-    ApiKeys.PRODUCE.id -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
-    ApiKeys.FETCH.id -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    ApiKeys.LIST_OFFSETS.id -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    ApiKeys.OFFSET_COMMIT.id -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
-    ApiKeys.OFFSET_FETCH.id -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    ApiKeys.GROUP_COORDINATOR.id -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
-    ApiKeys.UPDATE_METADATA_KEY.id -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
-    ApiKeys.JOIN_GROUP.id -> ((resp: JoinGroupResponse) => resp.errorCode()),
-    ApiKeys.SYNC_GROUP.id -> ((resp: SyncGroupResponse) => resp.errorCode()),
-    ApiKeys.HEARTBEAT.id -> ((resp: HeartbeatResponse) => resp.errorCode()),
-    ApiKeys.LEAVE_GROUP.id -> ((resp: LeaveGroupResponse) => resp.errorCode()),
-    ApiKeys.LEADER_AND_ISR.id -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    ApiKeys.STOP_REPLICA.id -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
-    ApiKeys.CREATE_TOPICS.id -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.code)
+  val RequestKeyToErrorCode = Map[ApiKeys, (Nothing) => Short](
+    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
+    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
+    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
+    ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
+    ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.errorCode()),
+    ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.errorCode()),
+    ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.errorCode()),
+    ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.errorCode()),
+    ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
+    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.code),
+    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2.code)
   )
 
-  val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
-    ApiKeys.METADATA.id -> TopicDescribeAcl,
-    ApiKeys.PRODUCE.id -> TopicWriteAcl,
-    ApiKeys.FETCH.id -> TopicReadAcl,
-    ApiKeys.LIST_OFFSETS.id -> TopicDescribeAcl,
-    ApiKeys.OFFSET_COMMIT.id -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.OFFSET_FETCH.id -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.GROUP_COORDINATOR.id -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.UPDATE_METADATA_KEY.id -> ClusterAcl,
-    ApiKeys.JOIN_GROUP.id -> GroupReadAcl,
-    ApiKeys.SYNC_GROUP.id -> GroupReadAcl,
-    ApiKeys.HEARTBEAT.id -> GroupReadAcl,
-    ApiKeys.LEAVE_GROUP.id -> GroupReadAcl,
-    ApiKeys.LEADER_AND_ISR.id -> ClusterAcl,
-    ApiKeys.STOP_REPLICA.id -> ClusterAcl,
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl,
-    ApiKeys.CREATE_TOPICS.id -> ClusterCreateAcl
+  val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
+    ApiKeys.METADATA -> TopicDescribeAcl,
+    ApiKeys.PRODUCE -> TopicWriteAcl,
+    ApiKeys.FETCH -> TopicReadAcl,
+    ApiKeys.LIST_OFFSETS -> TopicDescribeAcl,
+    ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.GROUP_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl,
+    ApiKeys.JOIN_GROUP -> GroupReadAcl,
+    ApiKeys.SYNC_GROUP -> GroupReadAcl,
+    ApiKeys.HEARTBEAT -> GroupReadAcl,
+    ApiKeys.LEAVE_GROUP -> GroupReadAcl,
+    ApiKeys.LEADER_AND_ISR -> ClusterAcl,
+    ApiKeys.STOP_REPLICA -> ClusterAcl,
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ClusterAcl,
+    ApiKeys.CREATE_TOPICS -> ClusterCreateAcl,
+    ApiKeys.DELETE_TOPICS -> TopicDeleteAcl
   )
 
-  // configure the servers and clients
-  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
-
   @Before
   override def setUp() {
     super.setUp()
@@ -156,6 +156,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       servers.head.groupCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
     TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+    TestUtils.createTopic(zkUtils, deleteTopic, 1, 1, this.servers)
   }
 
   @After
@@ -236,36 +237,39 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     new CreateTopicsRequest(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0)
   }
 
+  private def deleteTopicsRequest = {
+    new DeleteTopicsRequest(Set(deleteTopic).asJava, 5000)
+  }
+
   @Test
   def testAuthorization() {
-    val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
-      ApiKeys.METADATA.id -> createMetadataRequest,
-      ApiKeys.PRODUCE.id -> createProduceRequest,
-      ApiKeys.FETCH.id -> createFetchRequest,
-      ApiKeys.LIST_OFFSETS.id -> createListOffsetsRequest,
-      ApiKeys.OFFSET_FETCH.id -> createOffsetFetchRequest,
-      ApiKeys.GROUP_COORDINATOR.id -> createGroupCoordinatorRequest,
-      ApiKeys.UPDATE_METADATA_KEY.id -> createUpdateMetadataRequest,
-      ApiKeys.JOIN_GROUP.id -> createJoinGroupRequest,
-      ApiKeys.SYNC_GROUP.id -> createSyncGroupRequest,
-      ApiKeys.OFFSET_COMMIT.id -> createOffsetCommitRequest,
-      ApiKeys.HEARTBEAT.id -> createHeartbeatRequest,
-      ApiKeys.LEAVE_GROUP.id -> createLeaveGroupRequest,
-      ApiKeys.LEADER_AND_ISR.id -> createLeaderAndIsrRequest,
-      ApiKeys.STOP_REPLICA.id -> createStopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest,
-      ApiKeys.CREATE_TOPICS.id -> createTopicsRequest
+    val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+      ApiKeys.METADATA -> createMetadataRequest,
+      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.FETCH -> createFetchRequest,
+      ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
+      ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
+      ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest,
+      ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest,
+      ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
+      ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
+      ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
+      ApiKeys.HEARTBEAT -> createHeartbeatRequest,
+      ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest,
+      ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest,
+      ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
+      ApiKeys.CREATE_TOPICS -> createTopicsRequest,
+      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
 
-    val socket = new Socket("localhost", servers.head.boundPort())
-
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
       val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false)
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false)
       for ((resource, acls) <- RequestKeysToAcls(key))
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true)
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true)
     }
   }
 
@@ -526,6 +530,32 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     this.consumers.head.partitionsFor(topic)
   }
 
+  @Test
+  def testUnauthorizedDeleteWithoutDescribe() {
+    val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
+    val deleteResponse = DeleteTopicsResponse.parse(response)
+
+    assertEquals(Errors.INVALID_TOPIC_EXCEPTION, deleteResponse.errors.asScala.head._2)
+  }
+
+  @Test
+  def testUnauthorizedDeleteWithDescribe() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource)
+    val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
+    val deleteResponse = DeleteTopicsResponse.parse(response)
+
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
+  }
+
+  @Test
+  def testDeleteWithWildCardAuth() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+    val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
+    val deleteResponse = DeleteTopicsResponse.parse(response)
+
+    assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2)
+  }
+
   def removeAllAcls() = {
     servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
       servers.head.apis.authorizer.get.removeAcls(resource)
@@ -533,51 +563,30 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     }
   }
 
-  def sendRequestAndVerifyResponseErrorCode(socket: Socket,
-                                            key: Short,
+  def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys,
                                             request: AbstractRequest,
                                             resources: Set[ResourceType],
                                             isAuthorized: Boolean): AbstractRequestResponse = {
-    val header = new RequestHeader(key, "client", 1)
-    val body = request.toStruct
-
-    val buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf())
-    header.writeTo(buffer)
-    body.writeTo(buffer)
-    buffer.rewind()
-    val requestBytes = buffer.array()
-
-    sendRequest(socket, key, requestBytes)
-    val resp = receiveResponse(socket)
-    ResponseHeader.parse(resp)
-
-    val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
-    val errorCode = RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response)
+    val resp = send(request, apiKey)
+    val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
+    val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response)
+
+    val possibleErrorCodes = resources.flatMap { resourceType =>
+      if(resourceType == Topic)
+        // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names
+        Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code())
+      else
+        Seq(resourceType.errorCode)
+    }
 
-    val possibleErrorCodes = resources.map(_.errorCode)
     if (isAuthorized)
-      assertFalse(s"${ApiKeys.forId(key)} should be allowed", possibleErrorCodes.contains(errorCode))
+      assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
     else
-      assertTrue(s"${ApiKeys.forId(key)} should be forbidden", possibleErrorCodes.contains(errorCode))
+      assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
 
     response
   }
 
-  private def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
-    val outgoing = new DataOutputStream(socket.getOutputStream)
-    outgoing.writeInt(request.length)
-    outgoing.write(request)
-    outgoing.flush()
-  }
-
-  private def receiveResponse(socket: Socket): ByteBuffer = {
-    val incoming = new DataInputStream(socket.getInputStream)
-    val len = incoming.readInt()
-    val response = new Array[Byte](len)
-    incoming.readFully(response)
-    ByteBuffer.wrap(response)
-  }
-
   private def sendRecords(numRecords: Int, tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
       this.producers.head.send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ac23941..ea5a213 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -20,11 +20,13 @@ import kafka.log.Log
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
 import kafka.utils.ZkUtils._
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
 import org.junit.Test
 import java.util.Properties
+
 import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import org.apache.kafka.common.errors.InvalidTopicException
 
 class DeleteTopicTest extends ZooKeeperTestHarness {
 
@@ -202,7 +204,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicAndPartition.topic
     val servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, "test2")
+    try {
+      AdminUtils.deleteTopic(zkUtils, "test2")
+      fail("Expected InvalidTopicException")
+    } catch {
+      case e: InvalidTopicException => // expected exception
+    }
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
     // verify that topic test is untouched

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 6909ed3..35dbbf0 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -39,19 +39,14 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
   protected def propertyOverrides(properties: Properties) {}
 
   def generateConfigs() = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false,
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
+      enableControlledShutdown = false, enableDeleteTopic = true,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = saslProperties)
     props.foreach(propertyOverrides)
     props.map(KafkaConfig.fromProps)
   }
 
-  @Before
-  override def setUp() {
-    super.setUp()
-    TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
-  }
-
   def anySocketServer = {
     servers.find { server =>
       val state = server.brokerState.currentState

http://git-wip-us.apache.org/repos/asf/kafka/blob/539633ba/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
new file mode 100644
index 0000000..4ef1af1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -0,0 +1,121 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.network.SocketServer
+import kafka.utils._
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class DeleteTopicsRequestTest extends BaseRequestTest {
+
+  @Test
+  def testValidDeleteTopicRequests() {
+    val timeout = 10000
+    // Single topic
+    TestUtils.createTopic(zkUtils, "topic-1", 1, 1, servers)
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest(Set("topic-1").asJava, timeout))
+    // Multi topic
+    TestUtils.createTopic(zkUtils, "topic-3", 5, 2, servers)
+    TestUtils.createTopic(zkUtils, "topic-4", 1, 2, servers)
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest(Set("topic-3", "topic-4").asJava, timeout))
+  }
+
+  private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = {
+    val response = sendDeleteTopicsRequest(request, 0)
+
+    val error = response.errors.values.asScala.find(_ != Errors.NONE)
+    assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
+
+    request.topics.asScala.foreach { topic =>
+      validateTopicIsDeleted(topic)
+    }
+  }
+
+  @Test
+  def testErrorDeleteTopicRequests() {
+    val timeout = 30000
+    val timeoutTopic = "invalid-timeout"
+
+    // Basic
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava, timeout),
+      Map("invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION))
+
+    // Partial
+    TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers)
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set(
+      "partial-topic-1",
+      "partial-invalid-topic").asJava, timeout),
+      Map(
+        "partial-topic-1" -> Errors.NONE,
+        "partial-invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION
+      )
+    )
+
+    // Timeout
+    TestUtils.createTopic(zkUtils, timeoutTopic, 5, 2, servers)
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set(timeoutTopic).asJava, 1),
+      Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
+    // The topic should still get deleted eventually
+    TestUtils.waitUntilTrue(() => !servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is never deleted")
+    validateTopicIsDeleted(timeoutTopic)
+  }
+
+  private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = {
+    val response = sendDeleteTopicsRequest(request, 0)
+    val errors = response.errors.asScala
+    assertEquals("The response size should match", expectedResponse.size, response.errors.size)
+
+    expectedResponse.foreach { case (topic, expectedError) =>
+      assertEquals("The response error should match", expectedResponse(topic), errors(topic))
+      // If no error validate the topic was deleted
+      if (expectedError == Errors.NONE) {
+        validateTopicIsDeleted(topic)
+      }
+    }
+  }
+
+  @Test
+  def testNotController() {
+    val request = new DeleteTopicsRequest(Set("not-controller").asJava, 1000)
+    val response = sendDeleteTopicsRequest(request, 0, notControllerSocketServer)
+
+    val error = response.errors.asScala.head._2
+    assertEquals("Expected controller error when routed incorrectly",  Errors.NOT_CONTROLLER, error)
+  }
+
+  private def validateTopicIsDeleted(topic: String): Unit = {
+    val metadata = sendMetadataRequest(new MetadataRequest(List(topic).asJava)).topicMetadata.asScala
+    TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE),
+      s"The topic $topic should not exist")
+  }
+
+  private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, version: Short, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
+    val response = send(request, ApiKeys.DELETE_TOPICS, Some(version), socketServer)
+    DeleteTopicsResponse.parse(response, version)
+  }
+
+  private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = {
+    val response = send(request, ApiKeys.METADATA)
+    MetadataResponse.parse(response)
+  }
+}


Mime
View raw message