kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)
Date Mon, 01 May 2017 23:24:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c96656efb -> 4aed28d18


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 80e9191..65bec4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -83,114 +83,388 @@ import java.util.Map;
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
-    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
-    NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-            new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3,
-            new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    INVALID_FETCH_SIZE(4,
-            new InvalidFetchSizeException("The requested fetch size is invalid.")),
-    LEADER_NOT_AVAILABLE(5,
-            new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
-    NOT_LEADER_FOR_PARTITION(6,
-            new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7,
-            new TimeoutException("The request timed out.")),
-    BROKER_NOT_AVAILABLE(8,
-            new BrokerNotAvailableException("The broker is not available.")),
-    REPLICA_NOT_AVAILABLE(9,
-            new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")),
-    MESSAGE_TOO_LARGE(10,
-            new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    STALE_CONTROLLER_EPOCH(11,
-            new ControllerMovedException("The controller moved to another broker.")),
-    OFFSET_METADATA_TOO_LARGE(12,
-            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13,
-            new NetworkException("The server disconnected before a response was received.")),
-    COORDINATOR_LOAD_IN_PROGRESS(14,
-            new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")),
-    COORDINATOR_NOT_AVAILABLE(15,
-            new CoordinatorNotAvailableException("The coordinator is not available.")),
-    NOT_COORDINATOR(16,
-            new NotCoordinatorException("This is not the correct coordinator.")),
-    INVALID_TOPIC_EXCEPTION(17,
-            new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18,
-            new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
-    NOT_ENOUGH_REPLICAS(19,
-            new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
-    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
-            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
-    INVALID_REQUIRED_ACKS(21,
-            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
-    ILLEGAL_GENERATION(22,
-            new IllegalGenerationException("Specified group generation id is not valid.")),
+    UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownServerException(message);
+            }
+        }),
+    NONE(0, null,
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return null;
+            }
+        }),
+    OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OffsetOutOfRangeException(message);
+            }
+        }),
+    CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CorruptRecordException(message);
+            }
+        }),
+    UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownTopicOrPartitionException(message);
+            }
+        }),
+    INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidFetchSizeException(message);
+            }
+        }),
+    LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new LeaderNotAvailableException(message);
+            }
+        }),
+    NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotLeaderForPartitionException(message);
+            }
+        }),
+    REQUEST_TIMED_OUT(7, "The request timed out.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TimeoutException(message);
+            }
+        }),
+    BROKER_NOT_AVAILABLE(8, "The broker is not available.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new BrokerNotAvailableException(message);
+            }
+        }),
+    REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ReplicaNotAvailableException(message);
+            }
+        }),
+    MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RecordTooLargeException(message);
+            }
+        }),
+    STALE_CONTROLLER_EPOCH(11, "The controller moved to another broker.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ControllerMovedException(message);
+            }
+        }),
+    OFFSET_METADATA_TOO_LARGE(12, "The metadata field of the offset request was too large.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OffsetMetadataTooLarge(message);
+            }
+        }),
+    NETWORK_EXCEPTION(13, "The server disconnected before a response was received.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NetworkException(message);
+            }
+        }),
+    COORDINATOR_LOAD_IN_PROGRESS(14, "The coordinator is loading and hence can't process requests.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CoordinatorLoadInProgressException(message);
+            }
+        }),
+    COORDINATOR_NOT_AVAILABLE(15, "The coordinator is not available.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CoordinatorNotAvailableException(message);
+            }
+        }),
+    NOT_COORDINATOR(16, "This is not the correct coordinator.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotCoordinatorException(message);
+            }
+        }),
+    INVALID_TOPIC_EXCEPTION(17, "The request attempted to perform an operation on an invalid topic.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTopicException(message);
+            }
+        }),
+    RECORD_LIST_TOO_LARGE(18, "The request included message batch larger than the configured segment size on the server.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RecordBatchTooLargeException(message);
+            }
+        }),
+    NOT_ENOUGH_REPLICAS(19, "Messages are rejected since there are fewer in-sync replicas than required.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotEnoughReplicasException(message);
+            }
+        }),
+    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, "Messages are written to the log, but to fewer in-sync replicas than required.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotEnoughReplicasAfterAppendException(message);
+            }
+        }),
+    INVALID_REQUIRED_ACKS(21, "Produce request specified an invalid value for required acks.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidRequiredAcksException(message);
+            }
+        }),
+    ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new IllegalGenerationException(message);
+            }
+        }),
     INCONSISTENT_GROUP_PROTOCOL(23,
-            new InconsistentGroupProtocolException("The group member's supported protocols are incompatible with those of existing members.")),
-    INVALID_GROUP_ID(24,
-            new InvalidGroupIdException("The configured groupId is invalid")),
-    UNKNOWN_MEMBER_ID(25,
-            new UnknownMemberIdException("The coordinator is not aware of this member.")),
+            "The group member's supported protocols are incompatible with those of existing members.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InconsistentGroupProtocolException(message);
+            }
+        }),
+    INVALID_GROUP_ID(24, "The configured groupId is invalid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidGroupIdException(message);
+            }
+        }),
+    UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownMemberIdException(message);
+            }
+        }),
     INVALID_SESSION_TIMEOUT(26,
-            new InvalidSessionTimeoutException("The session timeout is not within the range allowed by the broker " +
-                    "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).")),
-    REBALANCE_IN_PROGRESS(27,
-            new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
-    INVALID_COMMIT_OFFSET_SIZE(28,
-            new InvalidCommitOffsetSizeException("The committing offset data size is not valid")),
-    TOPIC_AUTHORIZATION_FAILED(29,
-            new TopicAuthorizationException("Topic authorization failed.")),
-    GROUP_AUTHORIZATION_FAILED(30,
-            new GroupAuthorizationException("Group authorization failed.")),
-    CLUSTER_AUTHORIZATION_FAILED(31,
-            new ClusterAuthorizationException("Cluster authorization failed.")),
-    INVALID_TIMESTAMP(32,
-            new InvalidTimestampException("The timestamp of the message is out of acceptable range.")),
-    UNSUPPORTED_SASL_MECHANISM(33,
-            new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
-    ILLEGAL_SASL_STATE(34,
-            new IllegalSaslStateException("Request is not valid given the current SASL state.")),
-    UNSUPPORTED_VERSION(35,
-            new UnsupportedVersionException("The version of API is not supported.")),
-    TOPIC_ALREADY_EXISTS(36,
-            new TopicExistsException("Topic with this name already exists.")),
-    INVALID_PARTITIONS(37,
-            new InvalidPartitionsException("Number of partitions is invalid.")),
-    INVALID_REPLICATION_FACTOR(38,
-            new InvalidReplicationFactorException("Replication-factor is invalid.")),
-    INVALID_REPLICA_ASSIGNMENT(39,
-            new InvalidReplicaAssignmentException("Replica assignment is invalid.")),
-    INVALID_CONFIG(40,
-            new InvalidConfigurationException("Configuration is invalid.")),
-    NOT_CONTROLLER(41,
-        new NotControllerException("This is not the correct controller for this cluster.")),
-    INVALID_REQUEST(42,
-        new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
-            " the message was sent to an incompatible broker. See the broker logs for more details.")),
-    UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
-        new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
-    POLICY_VIOLATION(44,
-        new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
-    OUT_OF_ORDER_SEQUENCE_NUMBER(45,
-        new OutOfOrderSequenceException("The broker received an out of order sequence number")),
-    DUPLICATE_SEQUENCE_NUMBER(46,
-        new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
-    INVALID_PRODUCER_EPOCH(47,
-        new ProducerFencedException("Producer attempted an operation with an old epoch")),
-    INVALID_TXN_STATE(48,
-        new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
-    INVALID_PID_MAPPING(49,
-        new InvalidPidMappingException("The PID mapping is invalid")),
-    INVALID_TRANSACTION_TIMEOUT(50,
-        new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
-            "(as configured by max.transaction.timeout.ms).")),
-    CONCURRENT_TRANSACTIONS(51,
-        new ConcurrentTransactionsException("The producer attempted to update a transaction " +
-             "while another concurrent operation on the same transaction was ongoing"));
+            "The session timeout is not within the range allowed by the broker " +
+            "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidSessionTimeoutException(message);
+            }
+        }),
+    REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RebalanceInProgressException(message);
+            }
+        }),
+    INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidCommitOffsetSizeException(message);
+            }
+        }),
+    TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TopicAuthorizationException(message);
+            }
+        }),
+    GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new GroupAuthorizationException(message);
+            }
+        }),
+    CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ClusterAuthorizationException(message);
+            }
+        }),
+    INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTimestampException(message);
+            }
+        }),
+    UNSUPPORTED_SASL_MECHANISM(33, "The broker does not support the requested SASL mechanism.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedSaslMechanismException(message);
+            }
+        }),
+    ILLEGAL_SASL_STATE(34, "Request is not valid given the current SASL state.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new IllegalSaslStateException(message);
+            }
+        }),
+    UNSUPPORTED_VERSION(35, "The version of API is not supported.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedVersionException(message);
+            }
+        }),
+    TOPIC_ALREADY_EXISTS(36, "Topic with this name already exists.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TopicExistsException(message);
+            }
+        }),
+    INVALID_PARTITIONS(37, "Number of partitions is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidPartitionsException(message);
+            }
+        }),
+    INVALID_REPLICATION_FACTOR(38, "Replication-factor is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidReplicationFactorException(message);
+            }
+        }),
+    INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidReplicaAssignmentException(message);
+            }
+        }),
+    INVALID_CONFIG(40, "Configuration is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidConfigurationException(message);
+            }
+        }),
+    NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotControllerException(message);
+            }
+        }),
+    INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
+                "client library or the message was sent to an incompatible broker. See the broker logs " +
+                "for more details.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidRequestException(message);
+            }
+        }),
+    UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedForMessageFormatException(message);
+            }
+        }),
+    POLICY_VIOLATION(44, "Request parameters do not satisfy the configured policy.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new PolicyViolationException(message);
+            }
+        }),
+    OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order sequence number",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OutOfOrderSequenceException(message);
+            }
+        }),
+    DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new DuplicateSequenceNumberException(message);
+            }
+        }),
+    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ProducerFencedException(message);
+            }
+        }),
+    INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTxnStateException(message);
+            }
+        }),
+    INVALID_PID_MAPPING(49, "The PID mapping is invalid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidPidMappingException(message);
+            }
+        }),
+    INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
+                "the broker (as configured by max.transaction.timeout.ms).",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTxnTimeoutException(message);
+            }
+        }),
+    CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
+                "while another concurrent operation on the same transaction was ongoing",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ConcurrentTransactionsException(message);
+            }
+        });
+             
+    private interface ApiExceptionBuilder {
+        ApiException build(String message);
+    }
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
@@ -206,11 +480,13 @@ public enum Errors {
     }
 
     private final short code;
+    private final ApiExceptionBuilder builder;
     private final ApiException exception;
 
-    Errors(int code, ApiException exception) {
+    Errors(int code, String defaultExceptionString, ApiExceptionBuilder builder) {
         this.code = (short) code;
-        this.exception = exception;
+        this.builder = builder;
+        this.exception = builder.build(defaultExceptionString);
     }
 
     /**
@@ -221,6 +497,21 @@ public enum Errors {
     }
 
     /**
+     * Create an instance of the ApiException that contains the given error message.
+     *
+     * @param message    The message string to set.
+     * @return           The exception.
+     */
+    public ApiException exception(String message) {
+        if (message == null) {
+            // If no error message was specified, return an exception with the default error message.
+            return exception;
+        }
+        // Return an exception with the given error message.
+        return builder.build(message);
+    }
+
+    /**
      * Returns the class name of the exception or null if this is {@code Errors.NONE}.
      */
     public String exceptionName() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 54f9764..2c2b2dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -61,6 +62,10 @@ public class CreateTopicsResponse extends AbstractResponse {
             return message;
         }
 
+        public ApiException exception() {
+            return error.exception(message);
+        }
+
         @Override
         public String toString() {
             return "Error(error=" + error + ", message=" + message + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index bd79653..017fdf4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -260,7 +260,8 @@ public class MetadataResponse extends AbstractResponse {
             }
         }
 
-        return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
+        return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
+                internalTopics, this.controller);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 1a4de98..cb0ff89 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -659,7 +659,7 @@ public class Utils {
     /**
      * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
      */
-    public static void closeQuietly(Closeable closeable, String name) {
+    public static void closeQuietly(AutoCloseable closeable, String name) {
         if (closeable != null) {
             try {
                 closeable.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
new file mode 100644
index 0000000..36cb8e8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A unit test for KafkaAdminClient.
+ *
+ * See for an integration test of the KafkaAdminClient.
+ * Also see KafkaAdminClientIntegrationTest for a unit test of the admin client.
+ */
+public class KafkaAdminClientTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testGetOrCreateListValue() {
+        Map<String, List<String>> map = new HashMap<>();
+        List<String> fooList = KafkaAdminClient.getOrCreateListValue(map, "foo");
+        assertNotNull(fooList);
+        fooList.add("a");
+        fooList.add("b");
+        List<String> fooList2 = KafkaAdminClient.getOrCreateListValue(map, "foo");
+        assertEquals(fooList, fooList2);
+        assertTrue(fooList2.contains("a"));
+        assertTrue(fooList2.contains("b"));
+        List<String> barList = KafkaAdminClient.getOrCreateListValue(map, "bar");
+        assertNotNull(barList);
+        assertTrue(barList.isEmpty());
+    }
+
+    @Test
+    public void testCalcTimeoutMsRemainingAsInt() {
+        assertEquals(0, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1000));
+        assertEquals(100, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1100));
+        assertEquals(Integer.MAX_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(0, Long.MAX_VALUE));
+        assertEquals(Integer.MIN_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(Long.MAX_VALUE, 0));
+    }
+
+    @Test
+    public void testPrettyPrintException() {
+        assertEquals("Null exception.", KafkaAdminClient.prettyPrintException(null));
+        assertEquals("TimeoutException", KafkaAdminClient.prettyPrintException(new TimeoutException()));
+        assertEquals("TimeoutException: The foobar timed out.",
+            KafkaAdminClient.prettyPrintException(new TimeoutException("The foobar timed out.")));
+    }
+
+    private static Map<String, Object> newStrMap(String... vals) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
+        map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
+        if (vals.length % 2 != 0) {
+            throw new IllegalStateException();
+        }
+        for (int i = 0; i < vals.length; i += 2) {
+            map.put(vals[i], vals[i + 1]);
+        }
+        return map;
+    }
+
+    private static AdminClientConfig newConfMap(String... vals) {
+        return new AdminClientConfig(newStrMap(vals));
+    }
+
+    @Test
+    public void testGenerateClientId() {
+        Set<String> ids = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String id = KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, ""));
+            assertTrue("Got duplicate id " + id, !ids.contains(id));
+            ids.add(id);
+        }
+        assertEquals("myCustomId",
+            KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
+    }
+
+    private static class MockKafkaAdminClientContext implements AutoCloseable {
+        final static String CLUSTER_ID = "mockClusterId";
+        final AdminClientConfig adminClientConfig;
+        final Metadata metadata;
+        final HashMap<Integer, Node> nodes;
+        final MockClient mockClient;
+        final AdminClient client;
+        Cluster cluster;
+
+        MockKafkaAdminClientContext(Map<String, Object> config) {
+            this.adminClientConfig = new AdminClientConfig(config);
+            this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            this.nodes = new HashMap<Integer, Node>();
+            this.nodes.put(0, new Node(0, "localhost", 8121));
+            this.nodes.put(1, new Node(1, "localhost", 8122));
+            this.nodes.put(2, new Node(2, "localhost", 8123));
+            this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
+            this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
+            this.cluster = new Cluster(CLUSTER_ID,  nodes.values(),
+                Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+        }
+
+        @Override
+        public void close() {
+            this.client.close();
+        }
+    }
+
+    @Test
+    public void testCloseAdminClient() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+        }
+    }
+
+    private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+        throws InterruptedException {
+        try {
+            future.get();
+            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+                cause.getClass().getSimpleName(),
+                exceptionClass, cause.getClass());
+        }
+    }
+
+    /**
+     * Test that the client properly times out when we don't receive any metadata.
+     */
+    @Test
+    public void testTimeoutWithoutMetadata() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap(
+            AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.setNode(new Node(0, "localhost", 8121));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+                    put("myTopic", new Error(Errors.NONE, ""));
+                }}));
+            KafkaFuture<Void> future = ctx.client.
+                createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+                    }})), new CreateTopicsOptions().timeoutMs(1000)).all();
+            assertFutureError(future, TimeoutException.class);
+        }
+    }
+
+    @Test
+    public void testCreateTopics() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+            ctx.mockClient.setNode(ctx.nodes.get(0));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+                    put("myTopic", new Error(Errors.NONE, ""));
+                }}));
+            KafkaFuture<Void> future = ctx.client.
+                createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+                    }})), new CreateTopicsOptions().timeoutMs(10000)).all();
+            future.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
new file mode 100644
index 0000000..39868e0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A unit test for KafkaFuture.
+ */
+public class KafkaFutureTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testCompleteFutures() throws Exception {
+        KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
+        assertTrue(future123.complete(123));
+        assertEquals(Integer.valueOf(123), future123.get());
+        assertFalse(future123.complete(456));
+        assertTrue(future123.isDone());
+        assertFalse(future123.isCancelled());
+        assertFalse(future123.isCompletedExceptionally());
+
+        KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+        assertEquals(Integer.valueOf(456), future456.get());
+
+        KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+        futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
+        try {
+            futureFail.get();
+            Assert.fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(RuntimeException.class, e.getCause().getClass());
+            Assert.assertEquals("We require more vespene gas", e.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void testCompletingFutures() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons.");
+        assertFalse(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+        assertEquals("I am ready", future.getNow("I am ready"));
+        myThread.start();
+        String str = future.get(5, TimeUnit.MINUTES);
+        assertEquals("You must construct additional pylons.", str);
+        assertEquals("You must construct additional pylons.", future.getNow("I am ready"));
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+        myThread.join();
+        assertEquals(null, myThread.testException);
+    }
+
+    private static class CompleterThread<T> extends Thread {
+        private final KafkaFutureImpl<T> future;
+        private final T value;
+        Throwable testException = null;
+
+        CompleterThread(KafkaFutureImpl<T> future, T value) {
+            this.future = future;
+            this.value = value;
+        }
+
+        @Override
+        public void run() {
+            try {
+                try {
+                    Thread.sleep(0, 200);
+                } catch (InterruptedException e) {
+                }
+                future.complete(value);
+            } catch (Throwable testException) {
+                this.testException = testException;
+            }
+        }
+    }
+
+    private static class WaiterThread<T> extends Thread {
+        private final KafkaFutureImpl<T> future;
+        private final T expected;
+        Throwable testException = null;
+
+        WaiterThread(KafkaFutureImpl<T> future, T expected) {
+            this.future = future;
+            this.expected = expected;
+        }
+
+        @Override
+        public void run() {
+            try {
+                T value = future.get();
+                assertEquals(expected, value);
+            } catch (Throwable testException) {
+                this.testException = testException;
+            }
+        }
+    }
+
+    @Test
+    public void testAllOfFutures() throws Exception {
+        final int numThreads = 5;
+        final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            futures.add(new KafkaFutureImpl<Integer>());
+        }
+        KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
+        final List<CompleterThread> completerThreads = new ArrayList<>();
+        final List<WaiterThread> waiterThreads = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            completerThreads.add(new CompleterThread<>(futures.get(i), i));
+            waiterThreads.add(new WaiterThread<>(futures.get(i), i));
+        }
+        assertFalse(allFuture.isDone());
+        for (int i = 0; i < numThreads; i++) {
+            waiterThreads.get(i).start();
+        }
+        for (int i = 0; i < numThreads - 1; i++) {
+            completerThreads.get(i).start();
+        }
+        assertFalse(allFuture.isDone());
+        completerThreads.get(numThreads - 1).start();
+        allFuture.get();
+        assertTrue(allFuture.isDone());
+        for (int i = 0; i < numThreads; i++) {
+            assertEquals(Integer.valueOf(i), futures.get(i).get());
+        }
+        for (int i = 0; i < numThreads; i++) {
+            completerThreads.get(i).join();
+            waiterThreads.get(i).join();
+            assertEquals(null, completerThreads.get(i).testException);
+            assertEquals(null, waiterThreads.get(i).testException);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7342bbb..d297b70 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -40,6 +40,11 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.util.Try
 
+/**
+  * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
+  * and configurations.  This client is deprecated, and will be replaced by KafkaAdminClient.
+  * @see KafkaAdminClient
+  */
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
                   val retryBackoffMs: Long,

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
deleted file mode 100644
index ff9fef0..0000000
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-
-import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
-import kafka.server.KafkaConfig
-import java.lang.{Long => JLong}
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-import scala.collection.JavaConverters._
-
-class AdminClientTest extends IntegrationTestHarness with Logging {
-
-  val producerCount = 1
-  val consumerCount = 2
-  val serverCount = 3
-  val groupId = "my-test"
-  val clientId = "consumer-498"
-
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
-  val part2 = 1
-  val tp2 = new TopicPartition(topic, part2)
-
-  var client: AdminClient = null
-
-  // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    client = AdminClient.createSimplePlaintext(this.brokerList)
-    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
-  }
-
-  @After
-  override def tearDown() {
-    client.close()
-    super.tearDown()
-  }
-
-  @Test
-  def testSeekToBeginningAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(0L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(5L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(10L, consumer.position(tp))
-  }
-
-  @Test
-  def testConsumeAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    var messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 10
-    }, "Expected 10 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 7
-    }, "Expected 7 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 8L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 2
-    }, "Expected 2 messages", 3000L)
-  }
-
-  @Test
-  def testLogStartOffsetCheckpoint() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
-    for (i <- 0 until serverCount)
-      killBroker(i)
-    restartDeadBrokers()
-
-    client.close()
-    brokerList = TestUtils.bootstrapServers(servers, listenerName)
-    client = AdminClient.createSimplePlaintext(brokerList)
-
-    TestUtils.waitUntilTrue(() => {
-      // Need to retry if leader is not available for the partition
-      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
-    }, "Expected low watermark of the partition to be 5L")
-  }
-
-  @Test
-  def testLogStartOffsetAfterDeleteRecords() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-
-    for (i <- 0 until serverCount)
-      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
-  }
-
-  @Test
-  def testOffsetsForTimesAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
-  }
-
-  @Test
-  def testDeleteRecordsWithException() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    // Should get success result
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-    // OffsetOutOfRangeException if offset > high_watermark
-    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
-
-    val nonExistPartition = new TopicPartition(topic, 3)
-    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
-    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
-                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
-  }
-
-  @Test
-  def testListGroups() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val groups = client.listAllGroupsFlattened
-    assertFalse(groups.isEmpty)
-    val group = groups.head
-    assertEquals(groupId, group.groupId)
-    assertEquals("consumer", group.protocolType)
-  }
-
-  @Test
-  def testListAllBrokerVersionInfo() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val brokerVersionInfos = client.listAllBrokerVersionInfo
-    val brokers = brokerList.split(",")
-    assertEquals(brokers.size, brokerVersionInfos.size)
-    for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
-      val hostStr = s"${node.host}:${node.port}"
-      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
-      val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
-    }
-  }
-
-  @Test
-  def testGetConsumerGroupSummary() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val group = client.describeConsumerGroup(groupId)
-    assertEquals("range", group.assignmentStrategy)
-    assertEquals("Stable", group.state)
-    assertFalse(group.consumers.isEmpty)
-
-    val member = group.consumers.get.head
-    assertEquals(clientId, member.clientId)
-    assertFalse(member.host.isEmpty)
-    assertFalse(member.consumerId.isEmpty)
-  }
-
-  @Test
-  def testDescribeConsumerGroup() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val consumerGroupSummary = client.describeConsumerGroup(groupId)
-    assertEquals(1, consumerGroupSummary.consumers.get.size)
-    assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
-  }
-
-  @Test
-  def testDescribeConsumerGroupForNonExistentGroup() {
-    val nonExistentGroup = "non" + groupId
-    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
-  }
-
-  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
-    consumer.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumer.poll(0)
-      !consumer.assignment.isEmpty
-    }, "Expected non-empty assignment")
-  }
-
-  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
-                          numRecords: Int,
-                          tp: TopicPartition) {
-    val futures = (0 until numRecords).map { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
-      debug(s"Sending this record: $record")
-      producer.send(record)
-    }
-
-    futures.foreach(_.get)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..9c5468f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import org.apache.kafka.common.utils.Utils
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import org.apache.kafka.clients.admin._
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.KafkaFuture
+import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.{After, Rule, Test}
+import org.junit.rules.Timeout
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+/**
+ * An integration test of the KafkaAdminClient.
+ *
+ * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
+ */
+class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+
+  @Rule
+  def globalTimeout = Timeout.millis(120000)
+
+  var client: AdminClient = null
+
+  @After
+  def closeClient(): Unit = {
+    if (client != null)
+      Utils.closeQuietly(client, "AdminClient")
+  }
+
+  val brokerCount = 3
+  lazy val serverConfig = new Properties
+
+  def createConfig(): util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
+    TestUtils.waitUntilTrue(() => {
+        val topics = client.listTopics().names().get()
+        expectedPresent.forall(topicName => topics.contains(topicName)) &&
+          expectedMissing.forall(topicName => !topics.contains(topicName))
+      }, "timed out waiting for topics")
+  }
+
+  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+    try {
+      future.get()
+      fail("Expected CompletableFuture.get to return an exception")
+    } catch {
+      case e: ExecutionException =>
+        val cause = e.getCause()
+        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+            cause.getClass().getName, clazz.isInstance(cause))
+    }
+  }
+
+  @Test
+  def testClose(): Unit = {
+    val client = AdminClient.create(createConfig())
+    client.close()
+    client.close() // double close has no effect
+  }
+
+  @Test
+  def testListNodes(): Unit = {
+    client = AdminClient.create(createConfig())
+    val brokerStrs = brokerList.split(",").toList.sorted
+    var nodeStrs : List[String] = null
+    do {
+      var nodes = client.describeCluster().nodes().get().asScala
+      nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
+    } while (nodeStrs.size < brokerStrs.size)
+    assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
+    client.close()
+  }
+
+  @Test
+  def testCreateDeleteTopics(): Unit = {
+    client = AdminClient.create(createConfig())
+    val newTopics : List[NewTopic] = List(
+        new NewTopic("mytopic", 1, 1),
+        new NewTopic("mytopic2", 1, 1))
+    client.createTopics(newTopics.asJava,
+      new CreateTopicsOptions().validateOnly(true)).all().get()
+    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+    client.createTopics(newTopics.asJava).all().get()
+    waitForTopics(client, List("mytopic", "mytopic2"), List())
+
+    val results = client.createTopics(newTopics.asJava).results()
+    assert(results.containsKey("mytopic"))
+    assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
+    assert(results.containsKey("mytopic2"))
+    assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
+
+    val deleteTopics : Set[String] = Set("mytopic", "mytopic2")
+    client.deleteTopics(deleteTopics.asJava).all().get()
+    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+    client.close()
+  }
+
+  @Test
+  def testGetAllBrokerVersions(): Unit = {
+    client = AdminClient.create(createConfig())
+    val nodes = client.describeCluster().nodes().get()
+    val nodesToVersions = client.apiVersions(nodes).all().get()
+    val brokers = brokerList.split(",")
+    assert(brokers.size == nodesToVersions.size())
+    for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+    client.close()
+  }
+
+  override def generateConfigs() = {
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+    cfgs.foreach { config =>
+      config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
+      config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
+      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
+      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
+      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
+    }
+    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.map(KafkaConfig.fromProps)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
new file mode 100644
index 0000000..434a47f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConverters._
+
+/**
+  * Tests for the deprecated Scala AdminClient.
+  */
+class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+  val groupId = "my-test"
+  val clientId = "consumer-498"
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
+
+  var client: AdminClient = null
+
+  // configure the servers and clients
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    client = AdminClient.createSimplePlaintext(this.brokerList)
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @After
+  override def tearDown() {
+    client.close()
+    super.tearDown()
+  }
+
+  @Test
+  def testSeekToBeginningAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(0L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(5L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(10L, consumer.position(tp))
+  }
+
+  @Test
+  def testConsumeAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    var messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 8L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testLogStartOffsetCheckpoint() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+    for (i <- 0 until serverCount)
+      killBroker(i)
+    restartDeadBrokers()
+
+    client.close()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+    client = AdminClient.createSimplePlaintext(brokerList)
+
+    TestUtils.waitUntilTrue(() => {
+      // Need to retry if leader is not available for the partition
+      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+    }, "Expected low watermark of the partition to be 5L")
+  }
+
+  @Test
+  def testLogStartOffsetAfterDeleteRecords() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+  }
+
+  @Test
+  def testOffsetsForTimesAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+  }
+
+  @Test
+  def testDeleteRecordsWithException() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    // Should get success result
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+    // OffsetOutOfRangeException if offset > high_watermark
+    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+  }
+
+  @Test
+  def testListGroups() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val groups = client.listAllGroupsFlattened
+    assertFalse(groups.isEmpty)
+    val group = groups.head
+    assertEquals(groupId, group.groupId)
+    assertEquals("consumer", group.protocolType)
+  }
+
+  @Test
+  def testListAllBrokerVersionInfo() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val brokerVersionInfos = client.listAllBrokerVersionInfo
+    val brokers = brokerList.split(",")
+    assertEquals(brokers.size, brokerVersionInfos.size)
+    for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      val brokerVersionInfo = tryBrokerVersionInfo.get
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+  }
+
+  @Test
+  def testGetConsumerGroupSummary() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val group = client.describeConsumerGroup(groupId)
+    assertEquals("range", group.assignmentStrategy)
+    assertEquals("Stable", group.state)
+    assertFalse(group.consumers.isEmpty)
+
+    val member = group.consumers.get.head
+    assertEquals(clientId, member.clientId)
+    assertFalse(member.host.isEmpty)
+    assertFalse(member.consumerId.isEmpty)
+  }
+
+  @Test
+  def testDescribeConsumerGroup() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val consumerGroupSummary = client.describeConsumerGroup(groupId)
+    assertEquals(1, consumerGroupSummary.consumers.get.size)
+    assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
+  }
+
+  @Test
+  def testDescribeConsumerGroupForNonExistentGroup() {
+    val nonExistentGroup = "non" + groupId
+    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
+  }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    }
+
+    futures.foreach(_.get)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..f20ed0f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -0,0 +1,26 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
+
+class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslTestHarness {
+  override protected val zkSaslEnabled = true
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 02b5fe3..e826c7f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -561,6 +561,9 @@ object TestUtils extends Logging {
   def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
     securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
 
+  def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
+
   /**
    * Create a new consumer with a few pre-configured properties.
    */


Mime
View raw message