kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [2/2] kafka git commit: KAFKA-2073: migrate to client-side topic metadata request/response
Date Fri, 11 Mar 2016 19:12:05 GMT
KAFKA-2073: migrate to client-side topic metadata request/response

Author: Jason Gustafson <jason@confluent.io>
Author: Ismael Juma <ismael@juma.me.uk>
Author: hachikuji <jason@confluent.io>

Reviewers: Grant Henke, Ismael Juma, Gwen Shapira, Flavio Junquiera

Closes #988 from hachikuji/KAFKA-2073


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/764d8ca9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/764d8ca9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/764d8ca9

Branch: refs/heads/trunk
Commit: 764d8ca9eb0aba6099ba289a10f437e72b53ffec
Parents: 287cce2
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Mar 11 11:11:59 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Mar 11 11:11:59 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  12 +-
 .../clients/consumer/internals/Fetcher.java     |   7 +-
 .../main/java/org/apache/kafka/common/Node.java |  13 +-
 .../kafka/common/requests/MetadataRequest.java  |  20 +-
 .../kafka/common/requests/MetadataResponse.java | 272 ++++++++++++-------
 .../clients/consumer/internals/FetcherTest.java |  43 +--
 .../internals/DefaultPartitionerTest.java       |  14 +-
 .../common/requests/RequestResponseTest.java    |  22 +-
 .../main/scala/kafka/admin/AdminClient.scala    |   5 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  38 +--
 .../main/scala/kafka/api/TopicMetadata.scala    |   1 -
 .../scala/kafka/api/TopicMetadataRequest.scala  |  16 --
 core/src/main/scala/kafka/cluster/Broker.scala  |  16 +-
 .../scala/kafka/network/RequestChannel.scala    |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 165 ++++++-----
 .../main/scala/kafka/server/MetadataCache.scala | 209 +++++++-------
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   2 +-
 .../api/RequestResponseSerializationTest.scala  |  13 +-
 .../integration/BaseTopicMetadataTest.scala     |  20 +-
 .../unit/kafka/producer/ProducerTest.scala      |   2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   | 199 ++++++++++++++
 22 files changed, 699 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d4c4069..4d01cde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -18,6 +18,7 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -35,6 +36,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -147,6 +149,9 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean ready(Node node, long now) {
+        if (node.isEmpty())
+            throw new IllegalArgumentException("Cannot connect to empty node " + node);
+
         if (isReady(node, now))
             return true;
 
@@ -578,9 +583,10 @@ public class NetworkClient implements KafkaClient {
             MetadataResponse response = new MetadataResponse(body);
             Cluster cluster = response.cluster();
             // check if any topics metadata failed to get updated
-            if (response.errors().size() > 0) {
-                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
-            }
+            Map<String, Errors> errors = response.errors();
+            if (!errors.isEmpty())
+                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors);
+
             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
             // created which means we will get errors and no nodes until it exists
             if (cluster.nodes().size() > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index b4d5c02..802a2f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -215,13 +215,14 @@ public class Fetcher<K, V> {
                     throw new TopicAuthorizationException(unauthorizedTopics);
 
                 boolean shouldRetry = false;
-                if (!response.errors().isEmpty()) {
+                Map<String, Errors> errors = response.errors();
+                if (!errors.isEmpty()) {
                     // if there were errors, we need to check whether they were fatal or whether
                     // we should just retry
 
-                    log.debug("Topic metadata fetch included errors: {}", response.errors());
+                    log.debug("Topic metadata fetch included errors: {}", errors);
 
-                    for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) {
+                    for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
                         String topic = errorEntry.getKey();
                         Errors error = errorEntry.getValue();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 24cf6f4..6c3fd0b 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -17,6 +17,8 @@ package org.apache.kafka.common;
  */
 public class Node {
 
+    private static final Node NO_NODE = new Node(-1, "", -1);
+
     private final int id;
     private final String idString;
     private final String host;
@@ -31,7 +33,16 @@ public class Node {
     }
 
     public static Node noNode() {
-        return new Node(-1, "", -1);
+        return NO_NODE;
+    }
+
+    /**
+     * Check whether this node is empty, which may be the case if noNode() is used as a placeholder
+     * in a response payload with an error.
+     * @return true if it is, false otherwise
+     */
+    public boolean isEmpty() {
+        return host == null || host.isEmpty() || port < 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index a6c249f..92d8c6d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -12,9 +12,7 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -24,9 +22,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public class MetadataRequest extends AbstractRequest {
     
@@ -44,7 +40,7 @@ public class MetadataRequest extends AbstractRequest {
     public MetadataRequest(Struct struct) {
         super(struct);
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
+        topics = new ArrayList<>();
         for (Object topicObj: topicArray) {
             topics.add((String) topicObj);
         }
@@ -52,16 +48,16 @@ public class MetadataRequest extends AbstractRequest {
 
     @Override
     public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<String, Errors> topicErrors = new HashMap<String, Errors>();
-        for (String topic : topics) {
-            topicErrors.put(topic, Errors.forException(e));
-        }
+        List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
+        Errors error = Errors.forException(e);
+        List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
+
+        for (String topic : topics)
+            topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, partitions));
 
-        Cluster cluster = new Cluster(Collections.<Node>emptyList(), Collections.<PartitionInfo>emptyList(),
-                Collections.<String>emptySet());
         switch (versionId) {
             case 0:
-                return new MetadataResponse(cluster, topicErrors);
+                return new MetadataResponse(Collections.<Node>emptyList(), topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 805b9e7..13e0d8f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -12,15 +12,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -30,11 +21,20 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class MetadataResponse extends AbstractRequestResponse {
 
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
     private static final String BROKERS_KEY_NAME = "brokers";
-    private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+    private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 
     // broker level field names
     private static final String NODE_ID_KEY_NAME = "node_id";
@@ -71,18 +71,18 @@ public class MetadataResponse extends AbstractRequestResponse {
     private static final String REPLICAS_KEY_NAME = "replicas";
     private static final String ISR_KEY_NAME = "isr";
 
-    private final Cluster cluster;
-    private final Map<String, Errors> errors;
+    private final Collection<Node> brokers;
+    private final List<TopicMetadata> topicMetadata;
 
-    /**
-     * Constructor for MetadataResponse where there are errors for some of the topics,
-     * error data take precedence over cluster information for particular topic
-     */
-    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+
+    public MetadataResponse(List<Node> brokers, List<TopicMetadata> topicMetadata) {
         super(new Struct(CURRENT_SCHEMA));
 
-        List<Struct> brokerArray = new ArrayList<Struct>();
-        for (Node node : cluster.nodes()) {
+        this.brokers = brokers;
+        this.topicMetadata = topicMetadata;
+
+        List<Struct> brokerArray = new ArrayList<>();
+        for (Node node : brokers) {
             Struct broker = struct.instance(BROKERS_KEY_NAME);
             broker.set(NODE_ID_KEY_NAME, node.id());
             broker.set(HOST_KEY_NAME, node.host());
@@ -91,51 +91,39 @@ public class MetadataResponse extends AbstractRequestResponse {
         }
         struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
 
+        List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size());
+        for (TopicMetadata metadata : topicMetadata) {
+            Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, metadata.topic);
+            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code());
+
+            List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size());
+            for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
+                Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
+                partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, partitionMetadata.error.code());
+                partitionData.set(PARTITION_KEY_NAME, partitionMetadata.partition);
+                partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id());
+                ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size());
+                for (Node node : partitionMetadata.replicas)
+                    replicas.add(node.id());
+                partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
+                ArrayList<Integer> isr = new ArrayList<>(partitionMetadata.isr.size());
+                for (Node node : partitionMetadata.isr)
+                    isr.add(node.id());
+                partitionData.set(ISR_KEY_NAME, isr.toArray());
+                partitionMetadataArray.add(partitionData);
 
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
-            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, errorEntry.getKey());
-            topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errorEntry.getValue().code());
-            topicData.set(PARTITION_METADATA_KEY_NAME, new Struct[0]);
-            topicArray.add(topicData);
-        }
-
-        for (String topic : cluster.topics()) {
-            if (!errors.containsKey(topic)) {
-                Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-                topicData.set(TOPIC_KEY_NAME, topic);
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                List<Struct> partitionArray = new ArrayList<Struct>();
-                for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
-                    Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
-                    partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                    partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
-                    partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
-                    ArrayList<Integer> replicas = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.replicas())
-                        replicas.add(node.id());
-                    partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
-                    ArrayList<Integer> isr = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.inSyncReplicas())
-                        isr.add(node.id());
-                    partitionData.set(ISR_KEY_NAME, isr.toArray());
-                    partitionArray.add(partitionData);
-                }
-                topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
-                topicArray.add(topicData);
             }
+            topicData.set(PARTITION_METADATA_KEY_NAME, partitionMetadataArray.toArray());
+            topicMetadataArray.add(topicData);
         }
-        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
-
-        this.cluster = cluster;
-        this.errors = errors;
+        struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray());
     }
 
     public MetadataResponse(Struct struct) {
         super(struct);
-        Map<String, Errors> errors = new HashMap<String, Errors>();
-        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
+
+        Map<Integer, Node> brokers = new HashMap<>();
         Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
         for (int i = 0; i < brokerStructs.length; i++) {
             Struct broker = (Struct) brokerStructs[i];
@@ -144,63 +132,155 @@ public class MetadataResponse extends AbstractRequestResponse {
             int port = broker.getInt(PORT_KEY_NAME);
             brokers.put(nodeId, new Node(nodeId, host, port));
         }
-        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
+
+        List<TopicMetadata> topicMetadata = new ArrayList<>();
+        Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME);
         for (int i = 0; i < topicInfos.length; i++) {
             Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
+            Errors topicError = Errors.forCode(topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME));
             String topic = topicInfo.getString(TOPIC_KEY_NAME);
-            if (topicError == Errors.NONE.code()) {
-                Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
-                for (int j = 0; j < partitionInfos.length; j++) {
-                    Struct partitionInfo = (Struct) partitionInfos[j];
-                    int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
-                    int leader = partitionInfo.getInt(LEADER_KEY_NAME);
-                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                    Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
-                    Node[] replicaNodes = new Node[replicas.length];
-                    for (int k = 0; k < replicas.length; k++)
-                        replicaNodes[k] = brokers.get(replicas[k]);
-                    Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
-                    Node[] isrNodes = new Node[isr.length];
-                    for (int k = 0; k < isr.length; k++)
-                        isrNodes[k] = brokers.get(isr[k]);
-                    partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                }
-            } else {
-                errors.put(topic, Errors.forCode(topicError));
+            List<PartitionMetadata> partitionMetadata = new ArrayList<>();
+
+            Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
+            for (int j = 0; j < partitionInfos.length; j++) {
+                Struct partitionInfo = (Struct) partitionInfos[j];
+                Errors partitionError = Errors.forCode(partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME));
+                int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+                int leader = partitionInfo.getInt(LEADER_KEY_NAME);
+                Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+                List<Node> replicaNodes = new ArrayList<>(replicas.length);
+                for (Object replicaNodeId : replicas)
+                    replicaNodes.add(brokers.get(replicaNodeId));
+                Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
+                List<Node> isrNodes = new ArrayList<>(isr.length);
+                for (Object isrNode : isr)
+                    isrNodes.add(brokers.get(isrNode));
+                partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes));
             }
+
+            topicMetadata.add(new TopicMetadata(topicError, topic, partitionMetadata));
         }
 
-        this.errors = errors;
-        this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors));
+        this.brokers = brokers.values();
+        this.topicMetadata = topicMetadata;
     }
 
-    private Set<String> unauthorizedTopics(Map<String, Errors> topicErrors) {
-        if (topicErrors.isEmpty())
-            return Collections.emptySet();
+    /**
+     * Get a map of the topics which had metadata errors
+     * @return the map
+     */
+    public Map<String, Errors> errors() {
+        Map<String, Errors> errors = new HashMap<>();
+        for (TopicMetadata metadata : topicMetadata) {
+            if (metadata.error != Errors.NONE)
+                errors.put(metadata.topic(), metadata.error);
+        }
+        return errors;
+    }
 
+    /**
+     * Get a snapshot of the cluster metadata from this response
+     * @return the cluster snapshot
+     */
+    public Cluster cluster() {
         Set<String> unauthorizedTopics = new HashSet<>();
-        for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
-            if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED)
-                unauthorizedTopics.add(topicErrorEntry.getKey());
+        List<PartitionInfo> partitions = new ArrayList<>();
+        for (TopicMetadata metadata : topicMetadata) {
+            if (metadata.error == Errors.NONE) {
+                for (PartitionMetadata partitionMetadata : metadata.partitionMetadata)
+                    partitions.add(new PartitionInfo(
+                            metadata.topic,
+                            partitionMetadata.partition,
+                            partitionMetadata.leader,
+                            partitionMetadata.replicas.toArray(new Node[0]),
+                            partitionMetadata.isr.toArray(new Node[0])));
+            } else if (metadata.error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+                unauthorizedTopics.add(metadata.topic);
+            }
         }
-        return unauthorizedTopics;
+
+        return new Cluster(this.brokers, partitions, unauthorizedTopics);
     }
 
-    public Map<String, Errors> errors() {
-        return this.errors;
+    /**
+     * Get all brokers returned in metadata response
+     * @return the brokers
+     */
+    public Collection<Node> brokers() {
+        return brokers;
     }
 
-    public boolean hasError(String topic) {
-        return this.errors.containsKey(topic);
+    public static MetadataResponse parse(ByteBuffer buffer) {
+        return new MetadataResponse(CURRENT_SCHEMA.read(buffer));
     }
 
-    public Cluster cluster() {
-        return this.cluster;
+    public static class TopicMetadata {
+        private final Errors error;
+        private final String topic;
+        private final List<PartitionMetadata> partitionMetadata;
+
+        public TopicMetadata(Errors error,
+                             String topic,
+                             List<PartitionMetadata> partitionMetadata) {
+            this.error = error;
+            this.topic = topic;
+            this.partitionMetadata = partitionMetadata;
+        }
+
+        public Errors error() {
+            return error;
+        }
+
+        public String topic() {
+            return topic;
+        }
+
+        public List<PartitionMetadata> partitionMetadata() {
+            return partitionMetadata;
+        }
+
     }
 
-    public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse(CURRENT_SCHEMA.read(buffer));
+    public static class PartitionMetadata {
+        private final Errors error;
+        private final int partition;
+        private final Node leader;
+        private final List<Node> replicas;
+        private final List<Node> isr;
+
+        public PartitionMetadata(Errors error,
+                                 int partition,
+                                 Node leader,
+                                 List<Node> replicas,
+                                 List<Node> isr) {
+            this.error = error;
+            this.partition = partition;
+            this.leader = leader;
+            this.replicas = replicas;
+            this.isr = isr;
+        }
+
+        public Errors error() {
+            return error;
+        }
+
+        public int partition() {
+            return partition;
+        }
+
+        public Node leader() {
+            return leader;
+        }
+
+        public List<Node> replicas() {
+            return replicas;
+        }
+
+        public List<Node> isr() {
+            return isr;
+        }
+
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 823d04e..58c3841 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -53,6 +53,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -441,8 +442,7 @@ public class FetcherTest {
     @Test
     public void testGetAllTopics() {
         // sending response before request, as getTopicMetadata is a blocking call
-        client.prepareResponse(
-            new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
 
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
 
@@ -453,7 +453,7 @@ public class FetcherTest {
     public void testGetAllTopicsDisconnect() {
         // first try gets a disconnect, next succeeds
         client.prepareResponse(null, true);
-        client.prepareResponse(new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
         assertEquals(cluster.topics().size(), allTopics.size());
     }
@@ -466,8 +466,7 @@ public class FetcherTest {
 
     @Test
     public void testGetAllTopicsUnauthorized() {
-        client.prepareResponse(new MetadataResponse(cluster,
-                Collections.singletonMap(topicName, Errors.TOPIC_AUTHORIZATION_FAILED)).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED).toStruct());
         try {
             fetcher.getAllTopicMetadata(10L);
             fail();
@@ -478,17 +477,13 @@ public class FetcherTest {
 
     @Test(expected = InvalidTopicException.class)
     public void testGetTopicMetadataInvalidTopic() {
-        client.prepareResponse(new MetadataResponse(cluster,
-                Collections.singletonMap(topicName, Errors.INVALID_TOPIC_EXCEPTION)).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
         fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
     }
 
     @Test
     public void testGetTopicMetadataUnknownTopic() {
-        Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
-                Collections.<String>emptySet());
-        client.prepareResponse(new MetadataResponse(emptyCluster,
-                Collections.singletonMap(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
 
         Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
         assertNull(topicMetadata.get(topicName));
@@ -496,12 +491,9 @@ public class FetcherTest {
 
     @Test
     public void testGetTopicMetadataLeaderNotAvailable() {
-        Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(),
-                Collections.<String>emptySet());
-        client.prepareResponse(new MetadataResponse(emptyCluster,
-                Collections.singletonMap(topicName, Errors.LEADER_NOT_AVAILABLE)).toStruct());
-        client.prepareResponse(new MetadataResponse(this.cluster,
-                Collections.<String, Errors>emptyMap()).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
+        client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+
         Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L);
         assertTrue(topicMetadata.containsKey(topicName));
     }
@@ -565,6 +557,23 @@ public class FetcherTest {
         return response.toStruct();
     }
 
+    private MetadataResponse newMetadataResponse(String topic, Errors error) {
+        List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
+        if (error == Errors.NONE) {
+            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
+                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
+                        Errors.NONE,
+                        partitionInfo.partition(),
+                        partitionInfo.leader(),
+                        Arrays.asList(partitionInfo.replicas()),
+                        Arrays.asList(partitionInfo.inSyncReplicas())));
+            }
+        }
+
+        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, partitionsMetadata);
+        return new MetadataResponse(cluster.nodes(), Arrays.asList(topicMetadata));
+    }
+
     private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
                                                   SubscriptionState subscriptions,
                                                   Metrics metrics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index 7a5cef6..fd8a5bc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -12,19 +12,19 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class DefaultPartitionerTest {
     private byte[] keyBytes = "key".getBytes();
     private Partitioner partitioner = new DefaultPartitioner();

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index edeaf63..3023837 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -14,9 +14,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.BrokerEndPoint;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -278,16 +276,16 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createMetadataResponse() {
         Node node = new Node(1, "host1", 1001);
-        Node[] replicas = new Node[1];
-        replicas[0] = node;
-        Node[] isr = new Node[1];
-        isr[0] = node;
-        Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)),
-                Collections.<String>emptySet());
-
-        Map<String, Errors> errors = new HashMap<String, Errors>();
-        errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);
-        return new MetadataResponse(cluster, errors);
+        List<Node> replicas = Arrays.asList(node);
+        List<Node> isr = Arrays.asList(node);
+
+        List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic1",
+                Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2",
+                Collections.<MetadataResponse.PartitionMetadata>emptyList()));
+
+        return new MetadataResponse(Arrays.asList(node), allTopicMetadata);
     }
 
     private AbstractRequest createOffsetCommitRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index a8d9964..b857315 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -92,8 +92,9 @@ class AdminClient(val time: Time,
     val request = new MetadataRequest(List[String]())
     val responseBody = sendAnyNode(ApiKeys.METADATA, request)
     val response = new MetadataResponse(responseBody)
-    if (!response.errors().isEmpty)
-      debug(s"Metadata request contained errors: ${response.errors()}")
+    val errors = response.errors()
+    if (!errors.isEmpty)
+      debug(s"Metadata request contained errors: ${errors}")
     response.cluster().nodes().asScala.toList
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a868400..3fb44d3 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,23 +18,25 @@
 package kafka.admin
 
 import kafka.common._
-import kafka.cluster.{BrokerEndPoint, Broker}
+import kafka.cluster.Broker
 
 import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.utils.ZkUtils._
-import kafka.api.{TopicMetadata, PartitionMetadata}
 
 import java.util.Random
 import java.util.Properties
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.MetadataResponse
 
 import scala.Predef._
 import scala.collection._
-import mutable.ListBuffer
+import scala.collection.JavaConverters._
 import scala.collection.mutable
+import mutable.ListBuffer
 import collection.Map
 import collection.Set
 
@@ -390,15 +392,18 @@ object AdminUtils extends Logging {
   def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
     zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
 
-  def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
+  def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
 
-  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = {
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
   }
 
-  private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
+  private def fetchTopicMetadataFromZk(topic: String,
+                                       zkUtils: ZkUtils,
+                                       cachedBrokerInfo: mutable.HashMap[Int, Broker],
+                                       protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = {
     if(zkUtils.pathExists(getTopicPath(topic))) {
       val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
@@ -409,22 +414,22 @@ object AdminUtils extends Logging {
         val leader = zkUtils.getLeaderForPartition(topic, partition)
         debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-        var leaderInfo: Option[BrokerEndPoint] = None
-        var replicaInfo: Seq[BrokerEndPoint] = Nil
-        var isrInfo: Seq[BrokerEndPoint] = Nil
+        var leaderInfo: Node = Node.noNode()
+        var replicaInfo: Seq[Node] = Nil
+        var isrInfo: Seq[Node] = Nil
         try {
           leaderInfo = leader match {
             case Some(l) =>
               try {
-                Some(getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
+                getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(protocol)
               } catch {
                 case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
           }
           try {
-            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
-            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
+            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(protocol))
+            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(protocol))
           } catch {
             case e: Throwable => throw new ReplicaNotAvailableException(e)
           }
@@ -434,18 +439,17 @@ object AdminUtils extends Logging {
           if(isrInfo.size < inSyncReplicas.size)
             throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
               inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
+          new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
         } catch {
           case e: Throwable =>
             debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              Errors.forException(e).code)
+            new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
         }
       }
-      new TopicMetadata(topic, partitionMetadata)
+      new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava)
     } else {
       // topic doesn't exist, send appropriate error code
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+      new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index b0e41ec..ae5ea58 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
-import kafka.common._
 import org.apache.kafka.common.protocol.Errors
 
 object TopicMetadata {

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index be13586..0654e3d 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -30,22 +30,6 @@ import scala.collection.mutable.ListBuffer
 object TopicMetadataRequest extends Logging {
   val CurrentVersion = 0.shortValue
   val DefaultClientId = ""
-
-  /**
-   * TopicMetadataRequest has the following format -
-   * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
-   */
-
-  def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
-    val topics = new ListBuffer[String]()
-    for(i <- 0 until numTopics)
-      topics += readShortString(buffer)
-    new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList)
-  }
 }
 
 case class TopicMetadataRequest(versionId: Short,

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index b56cae9..7340f14 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 
 import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
 import kafka.utils.Json
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.protocol.SecurityProtocol
 
 /**
@@ -103,7 +104,7 @@ object Broker {
   }
 }
 
-case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) {
+case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) {
 
   override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
 
@@ -133,13 +134,14 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) {
     endPoints.contains(protocolType)
   }
 
+  def getNode(protocolType: SecurityProtocol): Node = {
+    val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+    new Node(id, endpoint.host, endpoint.port)
+  }
+
   def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
-    val endpoint = endPoints.get(protocolType)
-    endpoint match {
-      case Some(endpoint) => new BrokerEndPoint(id, endpoint.host, endpoint.port)
-      case None =>
-        throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))
-    }
+    val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
+    new BrokerEndPoint(id, endpoint.host, endpoint.port)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 916c438..1105802 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -63,7 +63,6 @@ object RequestChannel extends Logging {
     // o.a.k.common.requests.AbstractRequest.getRequest()
     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
       Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
-        ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
       )
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8f3a2ad..5f9ec8b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
+import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -40,7 +41,7 @@ import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
 StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
-OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
+MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
@@ -258,7 +259,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           case (topicPartition, partitionData) =>
             val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
             try {
-              if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).size <= 0)
+              if (!metadataCache.hasTopicMetadata(topicPartition.topic))
                 (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
               else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
                 (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
@@ -618,84 +619,105 @@ class KafkaApis(val requestChannel: RequestChannel,
     ret.toSeq.sortBy(- _)
   }
 
-  private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[TopicMetadata] = {
+  private def createTopic(topic: String,
+                          numPartitions: Int,
+                          replicationFactor: Int,
+                          properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
+    try {
+      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties)
+      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
+        .format(topic, numPartitions, replicationFactor))
+      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+    } catch {
+      case e: TopicExistsException => // let it go, possibly another broker created this topic
+        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())
+      case itex: InvalidTopicException =>
+        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, java.util.Collections.emptyList())
+    }
+  }
+
+  private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
+    val aliveBrokers = metadataCache.getAliveBrokers
+    val offsetsTopicReplicationFactor =
+      if (aliveBrokers.nonEmpty)
+        Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
+      else
+        config.offsetsTopicReplicationFactor.toInt
+    createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
+  }
+
+  private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = {
+    val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), securityProtocol)
+    topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
+  }
+
+  private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
     val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
-    if (topics.size > 0 && topicResponses.size != topics.size) {
+    if (topics.isEmpty || topicResponses.size == topics.size) {
+      topicResponses
+    } else {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == GroupCoordinator.GroupMetadataTopicName || config.autoCreateTopicsEnable) {
-          try {
-            if (topic == GroupCoordinator.GroupMetadataTopicName) {
-              val aliveBrokers = metadataCache.getAliveBrokers
-              val offsetsTopicReplicationFactor =
-                if (aliveBrokers.length > 0)
-                  Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
-                else
-                  config.offsetsTopicReplicationFactor.toInt
-              AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions,
-                                     offsetsTopicReplicationFactor,
-                                     coordinator.offsetsTopicConfigs)
-              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
-            }
-            else {
-              AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                   .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            }
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
-          } catch {
-            case e: TopicExistsException => // let it go, possibly another broker created this topic
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
-            case itex: InvalidTopicException =>
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.INVALID_TOPIC_EXCEPTION.code)
-          }
+        if (topic == GroupCoordinator.GroupMetadataTopicName) {
+          createGroupMetadataTopic()
+        } else if (config.autoCreateTopicsEnable) {
+          createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
         }
       }
-      topicResponses.appendAll(responsesForNonExistentTopics)
+      topicResponses ++ responsesForNonExistentTopics
     }
-    topicResponses
   }
 
   /**
    * Handle a topic metadata request
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
-    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-
-    //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
-    val topics = if (metadataRequest.topics.isEmpty) {
-      val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
-      topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet
+    val metadataRequest = request.body.asInstanceOf[MetadataRequest]
+
+    val topics = metadataRequest.topics.asScala.toSet
+    var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) {
+      //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
+      val authorized = metadataCache.getAllTopics()
+        .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+      (authorized, mutable.Set[String]())
     } else {
-      metadataRequest.topics.toSet
+      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
     }
 
-    //when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter.
-    var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
-
-    if (!authorizedTopics.isEmpty) {
-      val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
-      if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) {
-        val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
-        authorizer.foreach {
-          az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
-            authorizedTopics --= nonExistentTopics
-            unauthorizedTopics ++= nonExistentTopics
+    if (authorizedTopics.nonEmpty) {
+      val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
+      if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
+        authorizer.foreach { az =>
+          if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
+            authorizedTopics --= nonExistingTopics
+            unauthorizedTopics ++= nonExistingTopics
           }
         }
       }
     }
 
-    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.TOPIC_AUTHORIZATION_FAILED.code))
+    val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
+      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, java.util.Collections.emptyList()))
+
+    val topicMetadata = if (authorizedTopics.isEmpty)
+      Seq.empty[MetadataResponse.TopicMetadata]
+    else
+      getTopicMetadata(authorizedTopics, request.securityProtocol)
 
-    val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
-    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
+
+    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
+      brokers.mkString(","), request.header.correlationId, request.header.clientId))
+
+    val responseHeader = new ResponseHeader(request.header.correlationId)
+    val responseBody = new MetadataResponse(
+      brokers.map(_.getNode(request.securityProtocol)).asJava,
+      (topicMetadata ++ unauthorizedTopicMetadata).asJava
+    )
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
   }
 
   /*
@@ -725,7 +747,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val responseInfo = authorizedTopicPartitions.map { topicPartition =>
           val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
           try {
-            if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).isEmpty)
+            if (!metadataCache.hasTopicMetadata(topicPartition.topic))
               (topicPartition, unknownTopicPartitionResponse)
             else {
               val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
@@ -769,16 +791,21 @@ class KafkaApis(val requestChannel: RequestChannel,
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
-      val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
-        partitionMetadata => partitionMetadata.leader
-      }
+      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
 
-      val responseBody = coordinatorEndpoint match {
-        case None =>
-          new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
-        case Some(endpoint) =>
-          new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
+      val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
+        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+      } else {
+        val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
+          .find(_.partition == partition)
+          .map(_.leader())
+
+        coordinatorEndpoint match {
+          case Some(endpoint) if !endpoint.isEmpty =>
+            new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
+          case _ =>
+            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+        }
       }
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
@@ -788,8 +815,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeGroupRequest(request: RequestChannel.Request) {
-    import JavaConverters._
-
     val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
     val responseHeader = new ResponseHeader(request.header.correlationId)
 
@@ -814,8 +839,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    import JavaConverters._
-
     val responseHeader = new ResponseHeader(request.header.correlationId)
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4be795d..1fdd717 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,124 +17,151 @@
 
 package kafka.server
 
-import kafka.cluster.{EndPoint, BrokerEndPoint, Broker}
-import kafka.common.TopicAndPartition
+import java.util.EnumMap
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.api._
-import kafka.controller.KafkaController.StateChangeLogger
-import kafka.controller.LeaderIsrAndControllerEpoch
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, LeaderNotAvailableException}
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest
-import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
 import scala.collection.{Seq, Set, mutable}
 import scala.collection.JavaConverters._
-import kafka.utils.Logging
+import kafka.cluster.{Broker, EndPoint}
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
 import kafka.utils.CoreUtils._
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
+import kafka.utils.Logging
+import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
 private[server] class MetadataCache(brokerId: Int) extends Logging {
-  private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
-    new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
-  private var aliveBrokers: Map[Int, Broker] = Map()
+  private val stateChangeLogger = KafkaController.stateChangeLogger
+  private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]()
+  private val aliveBrokers = mutable.Map[Int, Broker]()
+  private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
   this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
 
-  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = {
+  private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[Node] = {
+    val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size))
+    brokers.foreach { brokerId =>
+      getAliveEndpoint(brokerId, protocol).foreach(result +=)
+    }
+    result
+  }
 
-    val isAllTopics = topics.isEmpty
-    val topicsRequested = if(isAllTopics) cache.keySet else topics
-    val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+  private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[Node] =
+    aliveNodes.get(brokerId).flatMap(_.get(protocol))
+
+  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+    cache.get(topic).map { partitions =>
+      partitions.map { case (partitionId, partitionState) =>
+        val topicPartition = TopicAndPartition(topic, partitionId)
+
+        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+        val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol)
+
+        val replicas = partitionState.allReplicas
+        val replicaInfo = getAliveEndpoints(replicas, protocol)
+
+        maybeLeader match {
+          case None =>
+            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
+            new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
+              replicaInfo.asJava, java.util.Collections.emptyList())
+
+          case Some(leader) =>
+            val isr = leaderAndIsr.isr
+            val isrInfo = getAliveEndpoints(isr, protocol)
+
+            if (replicaInfo.size < replicas.size) {
+              debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
+                .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))
+
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
+                replicaInfo.asJava, isrInfo.asJava)
+            } else if (isrInfo.size < isr.size) {
+              debug("Error while fetching metadata for %s: in sync replica information not available for following brokers %s"
+                .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
+              new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader,
+                replicaInfo.asJava, isrInfo.asJava)
+            } else {
+              new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava,
+                isrInfo.asJava)
+            }
+        }
+      }
+    }
+  }
+
+  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[MetadataResponse.TopicMetadata] = {
     inReadLock(partitionMetadataLock) {
-      for (topic <- topicsRequested) {
-        if (isAllTopics || cache.contains(topic)) {
-          val partitionStateInfos = cache(topic)
-          val partitionMetadata = partitionStateInfos.map {
-            case (partitionId, partitionState) =>
-              val replicas = partitionState.allReplicas
-              val replicaInfo: Seq[BrokerEndPoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
-              var leaderInfo: Option[BrokerEndPoint] = None
-              var leaderBrokerInfo: Option[Broker] = None
-              var isrInfo: Seq[BrokerEndPoint] = Nil
-              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              val topicPartition = TopicAndPartition(topic, partitionId)
-              try {
-                leaderBrokerInfo = aliveBrokers.get(leader)
-                if (!leaderBrokerInfo.isDefined)
-                  throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition))
-                else
-                  leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol))
-                if (replicaInfo.size < replicas.size)
-                  throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                if (isrInfo.size < isr.size)
-                  throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, Errors.NONE.code)
-              } catch {
-                case e: Throwable =>
-                  debug("Error while fetching metadata for %s: %s".format(topicPartition, e.toString))
-                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
-                    Errors.forException(e).code)
-              }
-          }
-          topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+      val topicsRequested = if (topics.isEmpty) cache.keySet else topics
+      topicsRequested.toSeq.flatMap { topic =>
+        getPartitionMetadata(topic, protocol).map { partitionMetadata =>
+          new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toBuffer.asJava)
         }
       }
     }
-    topicResponses
   }
 
-  def getAliveBrokers = {
+  def hasTopicMetadata(topic: String): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      cache.contains(topic)
+    }
+  }
+
+  def getAllTopics(): Set[String] = {
+    inReadLock(partitionMetadataLock) {
+      cache.keySet.toSet
+    }
+  }
+
+  def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    inReadLock(partitionMetadataLock) {
+      topics -- cache.keySet
+    }
+  }
+
+  def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
       aliveBrokers.values.toSeq
     }
   }
 
-  def addOrUpdatePartitionInfo(topic: String,
-                               partitionId: Int,
-                               stateInfo: PartitionStateInfo) {
+  private def addOrUpdatePartitionInfo(topic: String,
+                                       partitionId: Int,
+                                       stateInfo: PartitionStateInfo) {
     inWriteLock(partitionMetadataLock) {
-      cache.get(topic) match {
-        case Some(infos) => infos.put(partitionId, stateInfo)
-        case None => {
-          val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
-          cache.put(topic, newInfos)
-          newInfos.put(partitionId, stateInfo)
-        }
-      }
+      val infos = cache.getOrElseUpdate(topic, mutable.Map())
+      infos(partitionId) = stateInfo
     }
   }
 
   def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = {
     inReadLock(partitionMetadataLock) {
-      cache.get(topic) match {
-        case Some(partitionInfos) => partitionInfos.get(partitionId)
-        case None => None
-      }
+      cache.get(topic).flatMap(_.get(partitionId))
     }
   }
 
-  def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest,
-                  brokerId: Int,
-                  stateChangeLogger: StateChangeLogger) {
+  def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) {
     inWriteLock(partitionMetadataLock) {
-      aliveBrokers = updateMetadataRequest.liveBrokers.asScala.map { broker =>
-        val endPoints = broker.endPoints.asScala.map { case (protocol, ep) =>
-          (protocol, EndPoint(ep.host, ep.port, protocol))
-        }.toMap
-        (broker.id, Broker(broker.id, endPoints))
-      }.toMap
+      aliveNodes.clear()
+      aliveBrokers.clear()
+      updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
+        val nodes = new EnumMap[SecurityProtocol, Node](classOf[SecurityProtocol])
+        val endPoints = new EnumMap[SecurityProtocol, EndPoint](classOf[SecurityProtocol])
+        broker.endPoints.asScala.foreach { case (protocol, ep) =>
+          endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
+          nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
+        }
+        aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala)
+        aliveNodes(broker.id) = nodes.asScala
+      }
 
       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
         if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
@@ -167,16 +194,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  private def removePartitionInfo(topic: String, partitionId: Int) = {
-    cache.get(topic) match {
-      case Some(infos) => {
-        infos.remove(partitionId)
-        if(infos.isEmpty) {
-          cache.remove(topic)
-        }
-        true
-      }
-      case None => false
-    }
+  private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
+    cache.get(topic).map { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) cache.remove(topic)
+      true
+    }.getOrElse(false)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e388d98..5655313 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -582,7 +582,7 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
         throw new ControllerMovedException(stateControllerEpochErrorMessage)
       } else {
-        metadataCache.updateCache(correlationId, updateMetadataRequest, localBrokerId, stateChangeLogger)
+        metadataCache.updateCache(correlationId, updateMetadataRequest)
         controllerEpoch = updateMetadataRequest.controllerEpoch
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ccc86df..f39ed01 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -264,7 +264,7 @@ class ZkUtils(val zkClient: ZkClient,
    * @param advertisedEndpoints
    * @param jmxPort
    */
-  def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
+  def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
     val brokerIdPath = BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index fafc4b0..ca9dac4 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -145,14 +145,6 @@ object SerializationTestUtils {
     )
   }
 
-  def createTestTopicMetadataRequest: TopicMetadataRequest = {
-    new TopicMetadataRequest(1, 1, "client 1", Seq(topic1, topic2))
-  }
-
-  def createTestTopicMetadataResponse: TopicMetadataResponse = {
-    new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toSeq, Seq(topicmetaData1, topicmetaData2), 1)
-  }
-
   def createTestOffsetCommitRequestV2: OffsetCommitRequest = {
     new OffsetCommitRequest(
       groupId = "group 1",
@@ -217,8 +209,6 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val fetchRequest = SerializationTestUtils.createTestFetchRequest
   private val offsetRequest = SerializationTestUtils.createTestOffsetRequest
   private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
-  private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest
-  private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
   private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0
   private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1
   private val offsetCommitRequestV2 = SerializationTestUtils.createTestOffsetCommitRequestV2
@@ -234,8 +224,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
 
     val requestsAndResponses =
       collection.immutable.Seq(producerRequest, producerResponse,
-                               fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
-                               topicMetadataResponse,
+                               fetchRequest, offsetRequest, offsetResponse,
                                offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 2400cfb..7c9f3ae 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -18,10 +18,9 @@
 package kafka.integration
 
 import java.io.File
-import java.nio.ByteBuffer
 
 import kafka.admin.AdminUtils
-import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
+import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
 import kafka.cluster.{Broker, BrokerEndPoint}
 import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
@@ -64,23 +63,6 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testTopicMetadataRequest {
-    // create topic
-    val topic = "test"
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
-
-    // create a topic metadata request
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-
-    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
-    topicMetadataRequest.writeTo(serializedMetadataRequest)
-    serializedMetadataRequest.rewind()
-    val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest)
-
-    assertEquals(topicMetadataRequest, deserializedMetadataRequest)
-  }
-
-  @Test
   def testBasicTopicMetadata {
     // create topic
     val topic = "test"

http://git-wip-us.apache.org/repos/asf/kafka/blob/764d8ca9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 12b3583..de19f6f 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -353,7 +353,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       // create topic
       AdminUtils.createTopic(zkUtils, "new-topic", 2, 1)
       TestUtils.waitUntilTrue(() =>
-        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).errorCode != Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
+        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).error != Errors.UNKNOWN_TOPIC_OR_PARTITION,
         "Topic new-topic not created after timeout",
         waitTime = zookeeper.tickTime)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0)


Mime
View raw message