kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [3/3] kafka git commit: kafka-1809; Refactor brokers to allow listening on multiple ports and IPs; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao
Date Mon, 06 Apr 2015 00:21:50 GMT
kafka-1809; Refactor brokers to allow listening on multiple ports and IPs; patched by Gwen Shapira; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53f31432
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53f31432
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53f31432

Branch: refs/heads/trunk
Commit: 53f31432a0e1da78abf31ad42297790445083072
Parents: 07598ad
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Sun Apr 5 17:21:37 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sun Apr 5 17:21:37 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/ProducerConfig.java  |   1 -
 .../kafka/common/protocol/SecurityProtocol.java |  63 +++++++++
 .../org/apache/kafka/common/utils/Utils.java    |   4 +-
 .../apache/kafka/common/utils/UtilsTest.java    |   4 +-
 config/server.properties                        |   4 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  22 +--
 .../main/scala/kafka/admin/TopicCommand.scala   |   6 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  78 +++++++++++
 .../kafka/api/ConsumerMetadataResponse.scala    |  12 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  10 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |  20 ++-
 .../scala/kafka/api/TopicMetadataResponse.scala |   6 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |  41 ++++--
 .../main/scala/kafka/client/ClientUtils.scala   |  25 ++--
 core/src/main/scala/kafka/cluster/Broker.scala  | 118 ++++++++++++----
 .../scala/kafka/cluster/BrokerEndPoint.scala    |  67 +++++++++
 .../src/main/scala/kafka/cluster/EndPoint.scala |  78 +++++++++++
 .../BrokerEndPointNotAvailableException.scala   |  22 +++
 .../kafka/consumer/ConsumerFetcherManager.scala |  10 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   4 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   3 +-
 .../controller/ControllerChannelManager.scala   |  13 +-
 .../kafka/controller/KafkaController.scala      |   9 +-
 .../javaapi/ConsumerMetadataResponse.scala      |   5 +-
 .../scala/kafka/javaapi/TopicMetadata.scala     |   8 +-
 .../scala/kafka/network/RequestChannel.scala    |   5 +-
 .../main/scala/kafka/network/SocketServer.scala |  90 ++++++++----
 .../scala/kafka/producer/ProducerPool.scala     |  18 +--
 .../kafka/server/AbstractFetcherManager.scala   |   8 +-
 .../kafka/server/AbstractFetcherThread.scala    |   5 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 105 ++++++++++++--
 .../scala/kafka/server/KafkaHealthcheck.scala   |  25 ++--
 .../main/scala/kafka/server/KafkaServer.scala   | 114 ++++++++-------
 .../main/scala/kafka/server/MetadataCache.scala |  25 ++--
 .../kafka/server/ReplicaFetcherManager.scala    |   4 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |   2 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   6 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  18 ++-
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |   5 +-
 core/src/main/scala/kafka/utils/Utils.scala     |   9 ++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  38 +++--
 .../kafka/api/ProducerSendTest.scala            |   2 -
 .../scala/other/kafka/TestOffsetManager.scala   |   2 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala |   8 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |   7 +-
 .../api/RequestResponseSerializationTest.scala  |  74 ++++++++--
 .../unit/kafka/cluster/BrokerEndPointTest.scala | 124 ++++++++++++++++
 .../unit/kafka/integration/FetcherTest.scala    |   4 +-
 .../integration/KafkaServerTestHarness.scala    |   6 +-
 .../kafka/integration/TopicMetadataTest.scala   |  15 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |  52 +++++--
 .../unit/kafka/producer/AsyncProducerTest.scala |   8 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  27 ++--
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  15 +-
 .../kafka/server/KafkaConfigConfigDefTest.scala |   8 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 140 +++++++++++++++----
 .../unit/kafka/server/LeaderElectionTest.scala  |  18 ++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  12 +-
 system_test/README.txt                          |  34 ++---
 .../testcase_0001/testcase_0001_properties.json |   8 +-
 .../testcase_0002/testcase_0002_properties.json |   8 +-
 .../testcase_0003/testcase_0003_properties.json |   8 +-
 .../testcase_0004/testcase_0004_properties.json |   8 +-
 .../testcase_0005/testcase_0005_properties.json |   8 +-
 .../testcase_0006/testcase_0006_properties.json |   8 +-
 .../testcase_0007/testcase_0007_properties.json |   8 +-
 .../testcase_0008/testcase_0008_properties.json |   8 +-
 .../testcase_0009/testcase_0009_properties.json |   8 +-
 .../testcase_0101/testcase_0101_properties.json |   8 +-
 .../testcase_0102/testcase_0102_properties.json |   8 +-
 .../testcase_0103/testcase_0103_properties.json |   8 +-
 .../testcase_0104/testcase_0104_properties.json |   8 +-
 .../testcase_0105/testcase_0105_properties.json |   8 +-
 .../testcase_0106/testcase_0106_properties.json |   8 +-
 .../testcase_0107/testcase_0107_properties.json |   8 +-
 .../testcase_0108/testcase_0108_properties.json |   8 +-
 .../testcase_0109/testcase_0109_properties.json |   8 +-
 .../testcase_1/testcase_1_properties.json       |   6 +-
 system_test/run_all.sh                          |   7 +
 system_test/run_all_replica.sh                  |   7 +
 system_test/run_sanity.sh                       |   3 -
 system_test/testcase_to_run.json                |   5 -
 system_test/testcase_to_run_all_replica.json    | 123 ++++++++++++++++
 system_test/utils/kafka_system_test_utils.py    |   1 +
 88 files changed, 1497 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index fa9daae..ca1c7fe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -169,7 +169,6 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
 
-
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
new file mode 100644
index 0000000..d3394ee
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public enum SecurityProtocol {
+    /** Un-authenticated, non-encrypted channel */
+    PLAINTEXT(0, "PLAINTEXT"),
+    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
+    TRACE(Short.MAX_VALUE, "TRACE");
+
+    private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
+    private static final List<String> NAMES = new ArrayList<String>();
+
+    static {
+        for (SecurityProtocol proto: SecurityProtocol.values()) {
+            CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
+            NAMES.add(proto.name);
+        }
+    }
+
+    /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol  */
+    public final short id;
+
+    /** Name of the security protocol. This may be used by client configuration. */
+    public final String name;
+
+    private SecurityProtocol(int id, String name) {
+        this.id = (short) id;
+        this.name = name;
+    }
+
+    public static String getName(int id) {
+        return CODE_TO_SECURITY_PROTOCOL.get(id).name;
+    }
+
+    public static List<String> getNames() {
+        return NAMES;
+    }
+
+    public static SecurityProtocol forId(Short id) {
+        return CODE_TO_SECURITY_PROTOCOL.get(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 920b51a..39e8d7c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -27,7 +27,9 @@ import org.apache.kafka.common.KafkaException;
 
 public class Utils {
 
-    private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
+    // This matches URIs of formats: host:port and protocol:\\host:port
+    // IPv6 is supported with [ip] pattern
+    private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-z\\-.:]*)\\]?:([0-9]+)");
 
     public static final String NL = System.getProperty("line.separator");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index c899813..4b706d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -31,9 +31,9 @@ public class UtilsTest {
     @Test
     public void testGetHost() {
         assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
-        assertEquals("mydomain.com", getHost("mydomain.com:8080"));
+        assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
         assertEquals("::1", getHost("[::1]:1234"));
-        assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
+        assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 1614260..80ee2fc 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -21,8 +21,10 @@ broker.id=0
 
 ############################# Socket Server Settings #############################
 
+listeners=PLAINTEXT://:9092
+
 # The port the socket server listens on
-port=9092
+#port=9092
 
 # Hostname the broker will bind to. If not set, the server will bind to all interfaces
 #host.name=localhost

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b700110..0d3332e 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,14 +18,16 @@
 package kafka.admin
 
 import kafka.common._
-import kafka.cluster.Broker
+import kafka.cluster.{BrokerEndpoint, Broker}
+
 import kafka.log.LogConfig
 import kafka.utils._
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
 import java.util.Random
 import java.util.Properties
-import scala.Some
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 import scala.Predef._
 import scala.collection._
 import mutable.ListBuffer
@@ -341,7 +343,9 @@ object AdminUtils extends Logging {
     topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
   }
 
-  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
+
+
+  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
     if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
       val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
@@ -352,22 +356,22 @@ object AdminUtils extends Logging {
         val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
         debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-        var leaderInfo: Option[Broker] = None
-        var replicaInfo: Seq[Broker] = Nil
-        var isrInfo: Seq[Broker] = Nil
+        var leaderInfo: Option[BrokerEndpoint] = None
+        var replicaInfo: Seq[BrokerEndpoint] = Nil
+        var isrInfo: Seq[BrokerEndpoint] = Nil
         try {
           leaderInfo = leader match {
             case Some(l) =>
               try {
-                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
               } catch {
                 case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
           }
           try {
-            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
-            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
+            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
           } catch {
             case e: Throwable => throw new ReplicaNotAvailableException(e)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index d430e1d..e36a9d1 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,11 +25,9 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.JavaConversions._
-import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
 import kafka.server.OffsetManager
-import org.apache.kafka.common.utils.Utils.formatAddress
 
 
 object TopicCommand {
@@ -202,9 +200,7 @@ object TopicCommand {
       }
     }
   }
-  
-  def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
-  
+
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
     require(configsToBeAdded.forall(config => config.length == 2),

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
new file mode 100644
index 0000000..a7c15f3
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+/**
+ * This class contains the different Kafka versions.
+ * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves.
+ * This is only for inter-broker communications - when communicating with clients, the client decides on the API version.
+ *
+ * Note that the ID we initialize for each version is important.
+ * We consider a version newer than another, if it has a higher ID (to avoid depending on lexicographic order)
+ */
+object ApiVersion {
+  // This implicit is necessary due to: https://issues.scala-lang.org/browse/SI-8541
+  implicit def orderingByVersion[A <: ApiVersion]: Ordering[A] = Ordering.by(_.id)
+
+  private val versionNameMap = Map(
+    "0.8.0" -> KAFKA_080,
+    "0.8.1" -> KAFKA_081,
+    "0.8.2" -> KAFKA_082,
+    "0.8.3" -> KAFKA_083
+  )
+
+  def apply(version: String): ApiVersion  = versionNameMap(version.split("\\.").slice(0,3).mkString("."))
+
+  def latestVersion = versionNameMap.values.max
+}
+
+sealed trait ApiVersion extends Ordered[ApiVersion] {
+  val version: String
+  val id: Int
+
+  override def compare(that: ApiVersion): Int = {
+    ApiVersion.orderingByVersion.compare(this, that)
+  }
+
+  def onOrAfter(that: ApiVersion): Boolean = {
+    this.compare(that) >= 0
+  }
+
+  override def toString(): String = version
+}
+
+// Keep the IDs in order of versions
+case object KAFKA_080 extends ApiVersion {
+  val version: String = "0.8.0.X"
+  val id: Int = 0
+}
+
+case object KAFKA_081 extends ApiVersion {
+  val version: String = "0.8.1.X"
+  val id: Int = 1
+}
+
+case object KAFKA_082 extends ApiVersion {
+  val version: String = "0.8.2.X"
+  val id: Int = 2
+}
+
+case object KAFKA_083 extends ApiVersion {
+  val version: String = "0.8.3.X"
+  val id: Int = 3
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 24aaf95..d2a3d43 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -18,18 +18,18 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.common.ErrorMapping
 
 object ConsumerMetadataResponse {
   val CurrentVersion = 0
 
-  private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
+  private val NoBrokerEndpointOpt = Some(BrokerEndpoint(id = -1, host = "", port = -1))
   
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
-    val broker = Broker.readFrom(buffer)
+    val broker = BrokerEndpoint.readFrom(buffer)
     val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
       Some(broker)
     else
@@ -40,18 +40,18 @@ object ConsumerMetadataResponse {
   
 }
 
-case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndpoint], errorCode: Short, correlationId: Int)
   extends RequestOrResponse() {
 
   def sizeInBytes =
     4 + /* correlationId */
     2 + /* error code */
-    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).get.sizeInBytes
+    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
-    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
+    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
   }
 
   def describe(details: Boolean) = toString

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 4ff7e8f..bf93632 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -21,7 +21,7 @@ package kafka.api
 import java.nio._
 import kafka.utils._
 import kafka.api.ApiUtils._
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
@@ -120,9 +120,9 @@ object LeaderAndIsrRequest {
     }
 
     val leadersCount = buffer.getInt
-    var leaders = Set[Broker]()
+    var leaders = Set[BrokerEndpoint]()
     for (i <- 0 until leadersCount)
-      leaders += Broker.readFrom(buffer)
+      leaders += BrokerEndpoint.readFrom(buffer)
 
     new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
   }
@@ -134,10 +134,10 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker])
+                                leaders: Set[BrokerEndpoint])
     extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int,
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndpoint], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
          controllerId, controllerEpoch, partitionStateInfos, leaders)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 0190076..6de447d 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -17,18 +17,17 @@
 
 package kafka.api
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
 import kafka.common._
-import org.apache.kafka.common.utils.Utils._
 
 object TopicMetadata {
   
   val NoLeaderNodeId = -1
 
-  def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): TopicMetadata = {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
@@ -89,7 +88,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
 
 object PartitionMetadata {
 
-  def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): PartitionMetadata = {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
     val leaderId = buffer.getInt
@@ -110,9 +109,9 @@ object PartitionMetadata {
 }
 
 case class PartitionMetadata(partitionId: Int, 
-                             val leader: Option[Broker], 
-                             replicas: Seq[Broker], 
-                             isr: Seq[Broker] = Seq.empty,
+                             val leader: Option[BrokerEndpoint],
+                             replicas: Seq[BrokerEndpoint],
+                             isr: Seq[BrokerEndpoint] = Seq.empty,
                              errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
     2 /* error code */ + 
@@ -142,14 +141,13 @@ case class PartitionMetadata(partitionId: Int,
   override def toString(): String = {
     val partitionMetadataString = new StringBuilder
     partitionMetadataString.append("\tpartition " + partitionId)
-    partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none"))
-    partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(","))
-    partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(","))
+    partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none"))
+    partitionMetadataString.append("\treplicas: " + replicas.mkString(","))
+    partitionMetadataString.append("\tisr: " + isr.mkString(","))
     partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false"))
     partitionMetadataString.toString()
   }
 
-  private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 92ac4e6..776b604 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import java.nio.ByteBuffer
 
 object TopicMetadataResponse {
@@ -25,7 +25,7 @@ object TopicMetadataResponse {
   def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
     val correlationId = buffer.getInt
     val brokerCount = buffer.getInt
-    val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
+    val brokers = (0 until brokerCount).map(_ => BrokerEndpoint.readFrom(buffer))
     val brokerMap = brokers.map(b => (b.id, b)).toMap
     val topicCount = buffer.getInt
     val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
@@ -33,7 +33,7 @@ object TopicMetadataResponse {
   }
 }
 
-case class TopicMetadataResponse(brokers: Seq[Broker],
+case class TopicMetadataResponse(brokers: Seq[BrokerEndpoint],
                                  topicsMetadata: Seq[TopicMetadata],
                                  correlationId: Int)
     extends RequestOrResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 530982e..a91e3a6 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.api
+package kafka.api
 
 import java.nio.ByteBuffer
+
 import kafka.api.ApiUtils._
-import kafka.cluster.Broker
-import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.cluster.{Broker, BrokerEndpoint}
+import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
 import kafka.network.RequestChannel.Response
-import collection.Set
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import org.apache.kafka.common.protocol.SecurityProtocol
+
+import scala.collection.Set
 
 object UpdateMetadataRequest {
-  val CurrentVersion = 0.shortValue
+  val CurrentVersion = 1.shortValue
   val IsInit: Boolean = true
   val NotInit: Boolean = false
   val DefaultAckTimeout: Int = 1000
@@ -48,7 +51,13 @@ object UpdateMetadataRequest {
     }
 
     val numAliveBrokers = buffer.getInt
-    val aliveBrokers = for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
+
+    val aliveBrokers = versionId match {
+      case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndpoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
+      case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
+      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
+    }
+
     new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
       partitionStateInfos.toMap, aliveBrokers.toSet)
   }
@@ -82,7 +91,12 @@ case class UpdateMetadataRequest (versionId: Short,
       value.writeTo(buffer)
     }
     buffer.putInt(aliveBrokers.size)
-    aliveBrokers.foreach(_.writeTo(buffer))
+
+    versionId match {
+      case 0 => aliveBrokers.foreach(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).writeTo(buffer))
+      case 1 => aliveBrokers.foreach(_.writeTo(buffer))
+      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
+    }
   }
 
   def sizeInBytes(): Int = {
@@ -96,8 +110,15 @@ case class UpdateMetadataRequest (versionId: Short,
     for((key, value) <- partitionStateInfos)
       size += (2 + key.topic.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
     size += 4 /* number of alive brokers in the cluster */
-    for(broker <- aliveBrokers)
-      size += broker.sizeInBytes /* broker info */
+
+    versionId match  {
+      case 0 => for(broker <- aliveBrokers)
+        size += broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).sizeInBytes /* broker info */
+      case 1 => for(broker  <- aliveBrokers)
+        size += broker.sizeInBytes
+      case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest. Valid versions are 0 or 1.")
+    }
+
     size
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ebba87f..ad4c9d2 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.client
+package kafka.client
+
+import org.apache.kafka.common.protocol.SecurityProtocol
 
 import scala.collection._
 import kafka.cluster._
@@ -24,11 +26,10 @@ import kafka.common.{ErrorMapping, KafkaException}
 import kafka.utils.{Utils, Logging}
 import java.util.Properties
 import util.Random
- import kafka.network.BlockingChannel
- import kafka.utils.ZkUtils._
- import org.I0Itec.zkclient.ZkClient
- import java.io.IOException
-import org.apache.kafka.common.utils.Utils.{getHost, getPort}
+import kafka.network.BlockingChannel
+import kafka.utils.ZkUtils._
+import org.I0Itec.zkclient.ZkClient
+import java.io.IOException
 
  /**
  * Helper functions common to clients (producer, consumer, or admin)
@@ -42,7 +43,7 @@ object ClientUtils extends Logging{
    * @param producerConfig The producer's config
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
@@ -83,7 +84,7 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], clientId: String, timeoutMs: Int,
                          correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
     props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
@@ -96,11 +97,11 @@ object ClientUtils extends Logging{
   /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */
-  def parseBrokerList(brokerListStr: String): Seq[Broker] = {
+  def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = {
     val brokersStr = Utils.parseCsvList(brokerListStr)
 
     brokersStr.zipWithIndex.map { case (address, brokerId) =>
-      new Broker(brokerId, getHost(address), getPort(address))
+      BrokerEndpoint.createBrokerEndPoint(brokerId, address)
     }
   }
 
@@ -111,7 +112,7 @@ object ClientUtils extends Logging{
      var channel: BlockingChannel = null
      var connected = false
      while (!connected) {
-       val allBrokers = getAllBrokersInCluster(zkClient)
+       val allBrokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
        Random.shuffle(allBrokers).find { broker =>
          trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
          try {
@@ -143,7 +144,7 @@ object ClientUtils extends Logging{
 
      while (!offsetManagerChannelOpt.isDefined) {
 
-       var coordinatorOpt: Option[Broker] = None
+       var coordinatorOpt: Option[BrokerEndpoint] = None
 
        while (!coordinatorOpt.isDefined) {
          try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 0060add..3933bb3 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,18 +17,42 @@
 
 package kafka.cluster
 
-import kafka.utils.Utils._
-import kafka.utils.Json
-import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
-import kafka.common.{KafkaException, BrokerNotAvailableException}
-import org.apache.kafka.common.utils.Utils._
+
+import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
+import kafka.utils.Json
+import kafka.utils.Utils._
+import org.apache.kafka.common.protocol.SecurityProtocol
 
 /**
- * A Kafka broker
+ * A Kafka broker.
+ * A broker has an id and a collection of end-points.
+ * Each end-point is (host, port,protocolType).
+ * Currently the only protocol type is PlainText but we will add SSL and Kerberos in the future.
  */
 object Broker {
 
+  /**
+   * Create a broker object from id and JSON string.
+   * @param id
+   * @param brokerInfoString
+   *
+   * Version 1 JSON schema for a broker is:
+   * {"version":1,
+   *  "host":"localhost",
+   *  "port":9092
+   *  "jmx_port":9999,
+   *  "timestamp":"2233345666" }
+   *
+   * The current JSON schema for a broker is:
+   * {"version":2,
+   *  "host","localhost",
+   *  "port",9092
+   *  "jmx_port":9999,
+   *  "timestamp":"2233345666",
+   *  "endpoints": ["PLAINTEXT://host1:9092",
+   *                "SSL://host1:9093"]
+   */
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
       throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
@@ -36,9 +60,21 @@ object Broker {
       Json.parseFull(brokerInfoString) match {
         case Some(m) =>
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
-          val host = brokerInfo.get("host").get.asInstanceOf[String]
-          val port = brokerInfo.get("port").get.asInstanceOf[Int]
-          new Broker(id, host, port)
+          val version = brokerInfo("version").asInstanceOf[Int]
+          val endpoints = version match {
+            case 1 =>
+              val host = brokerInfo("host").asInstanceOf[String]
+              val port = brokerInfo("port").asInstanceOf[Int]
+              Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
+            case 2 =>
+              val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
+              listeners.map(listener => {
+                val ep = EndPoint.createEndPoint(listener)
+                (ep.protocolType, ep)
+              }).toMap
+            case _ => throw new KafkaException("Unknown version of broker registration. Only versions 1 and 2 are supported." + brokerInfoString)
+          }
+          new Broker(id, endpoints)
         case None =>
           throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
@@ -47,36 +83,70 @@ object Broker {
     }
   }
 
+  /**
+   *
+   * @param buffer Containing serialized broker.
+   *               Current serialization is:
+   *               id (int), number of endpoints (int), serialized endpoints
+   * @return broker object
+   */
   def readFrom(buffer: ByteBuffer): Broker = {
     val id = buffer.getInt
-    val host = readShortString(buffer)
-    val port = buffer.getInt
-    new Broker(id, host, port)
+    val numEndpoints = buffer.getInt
+
+    val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer))
+            .map(ep => ep.protocolType -> ep).toMap
+    new Broker(id, endpoints)
   }
 }
 
-case class Broker(id: Int, host: String, port: Int) {
-  
-  override def toString: String = "id:" + id + ",host:" + host + ",port:" + port
+case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) {
+
+  override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
+
+  def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
+    this(id, Map(protocol -> EndPoint(host, port, protocol)))
+  }
+
+  def this(bep: BrokerEndpoint, protocol: SecurityProtocol) = {
+    this(bep.id, bep.host, bep.port, protocol)
+  }
 
-  def connectionString: String = formatAddress(host, port)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
-    writeShortString(buffer, host)
-    buffer.putInt(port)
+    buffer.putInt(endPoints.size)
+    for(endpoint <- endPoints.values) {
+      endpoint.writeTo(buffer)
+    }
   }
 
-  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+  def sizeInBytes: Int =
+    4 + /* broker id*/
+    4 + /* number of endPoints */
+    endPoints.values.map(_.sizeInBytes).sum /* end points */
+
+  def supportsChannel(protocolType: SecurityProtocol): Unit = {
+    endPoints.contains(protocolType)
+  }
+
+  def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndpoint = {
+    val endpoint = endPoints.get(protocolType)
+    endpoint match {
+      case Some(endpoint) => new BrokerEndpoint(id, endpoint.host, endpoint.port)
+      case None =>
+        throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))
+    }
+  }
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: Broker => id == n.id && host == n.host && port == n.port
+      // Yes, Scala compares lists element by element
+      case n: Broker => id == n.id && endPoints == n.endPoints
       case _ => false
     }
   }
-  
-  override def hashCode(): Int = hashcode(id, host, port)
-  
-}
+
+  override def hashCode(): Int = hashcode(id, endPoints)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
new file mode 100644
index 0000000..22dba18
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.KafkaException
+import org.apache.kafka.common.utils.Utils._
+
+object BrokerEndpoint {
+  def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = {
+
+    // BrokerEndPoint URI is host:port or [ipv6_host]:port
+    // Note that unlike EndPoint (or listener) this URI has no security information.
+    val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
+
+    connectionString match {
+      case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt)
+      case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
+    }
+  }
+
+  def readFrom(buffer: ByteBuffer): BrokerEndpoint = {
+    val brokerId = buffer.getInt()
+    val host = readShortString(buffer)
+    val port = buffer.getInt()
+    BrokerEndpoint(brokerId, host, port)
+  }
+}
+
+/**
+ * BrokerEndpoint is used to connect to specific host:port pair.
+ * It is typically used by clients (or brokers when connecting to other brokers)
+ * and contains no information about the security protocol used on the connection.
+ * Clients should know which security protocol to use from configuration.
+ * This allows us to keep the wire protocol with the clients unchanged where the protocol is not needed.
+ */
+case class BrokerEndpoint(id: Int, host: String, port: Int) {
+
+  def connectionString(): String = formatAddress(host, port)
+
+  def writeTo(buffer: ByteBuffer): Unit = {
+    buffer.putInt(id)
+    writeShortString(buffer, host)
+    buffer.putInt(port)
+  }
+
+  def sizeInBytes: Int =
+    4 + /* broker Id */
+    4 + /* port */
+    shortStringLength(host)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
new file mode 100644
index 0000000..3286f6d
--- /dev/null
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.cluster
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.KafkaException
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
+
+object EndPoint {
+
+  def readFrom(buffer: ByteBuffer): EndPoint = {
+    val port = buffer.getInt()
+    val host = readShortString(buffer)
+    val protocol = buffer.getShort()
+    EndPoint(host, port, SecurityProtocol.forId(protocol))
+  }
+
+  /**
+   * Create EndPoint object from connectionString
+   * @param connectionString the format is protocol://host:port or protocol://[ipv6 host]:port
+   *                         for example: PLAINTEXT://myhost:9092 or PLAINTEXT://[::1]:9092
+   *                         Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface
+   *                         Negative ports are also accepted, since they are used in some unit tests
+   * @return
+   */
+  def createEndPoint(connectionString: String): EndPoint = {
+    val uriParseExp = """^(.*)://\[?([0-9a-z\-.:]*)\]?:(-?[0-9]+)""".r
+    connectionString match {
+      case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.valueOf(protocol))
+      case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.valueOf(protocol))
+      case _ => throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
+    }
+  }
+}
+
+/**
+ * Part of the broker definition - matching host/port pair to a protocol
+ */
+case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
+
+  def connectionString(): String = {
+    val hostport =
+      if (host == null)
+        ":"+port
+      else
+        Utils.formatAddress(host, port)
+    protocolType + "://" + hostport
+  }
+
+  def writeTo(buffer: ByteBuffer): Unit = {
+    buffer.putInt(port)
+    writeShortString(buffer, host)
+    buffer.putShort(protocolType.id.toShort)
+  }
+
+  def sizeInBytes: Int =
+    4 + /* port */
+    shortStringLength(host) +
+    2 /* protocol id */
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala
new file mode 100644
index 0000000..455d8c6
--- /dev/null
+++ b/core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class BrokerEndPointNotAvailableException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index b9e2bea..484a57f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -19,9 +19,9 @@ package kafka.consumer
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
-import kafka.cluster.{Cluster, Broker}
+import kafka.cluster.{BrokerEndpoint, Cluster}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.immutable
-import scala.collection.Map
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
@@ -53,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
     override def doWork() {
-      val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
+      val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndpoint]
       lock.lock()
       try {
         while (noLeaderPartitionSet.isEmpty) {
@@ -62,7 +62,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         }
 
         trace("Partitions without leader %s".format(noLeaderPartitionSet))
-        val brokers = getAllBrokersInCluster(zkClient)
+        val brokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
         val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                             brokers,
                                                             config.clientId,
@@ -114,7 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     }
   }
 
-  override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = {
     new ConsumerFetcherThread(
       "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
       config, sourceBroker, partitionMap, this)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 152fda5..cde4481 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
@@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
-                            sourceBroker: Broker,
+                            sourceBroker: BrokerEndpoint,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name,

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b1cf0db..3e8a75b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -185,7 +185,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private def ensureOffsetManagerConnected() {
     if (config.offsetsStorage == "kafka") {
       if (offsetsChannel == null || !offsetsChannel.isConnected)
-        offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
+        offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient,
+          config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
 
       debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c582191..fb596ba 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,8 +24,6 @@ import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
 import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
-import org.apache.log4j.Logger
-import scala.Some
 import kafka.common.TopicAndPartition
 import kafka.api.RequestOrResponse
 import collection.Set
@@ -80,7 +78,8 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
     debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
-    val channel = new BlockingChannel(broker.host, broker.port,
+    val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
+    val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port,
       BlockingChannel.UseDefaultBufferSize,
       BlockingChannel.UseDefaultBufferSize,
       config.controllerSocketTimeoutMs)
@@ -284,7 +283,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-      val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
+      val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
       val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
@@ -299,8 +298,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     updateMetadataRequestMap.foreach { m =>
       val broker = m._1
       val partitionStateInfos = m._2.toMap
-      val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
-        partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
+
+      val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0
+      val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch,
+        correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
       partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
         "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
         correlationId, broker, p._1)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 09fc46d..f2e62f5 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -19,7 +19,6 @@ package kafka.controller
 import collection._
 import collection.Set
 import com.yammer.metrics.core.Gauge
-import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
 import kafka.admin.AdminUtils
 import kafka.admin.PreferredReplicaLeaderElectionCommand
@@ -36,9 +35,7 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
-import scala.None
 import kafka.server._
-import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
@@ -212,7 +209,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
 
   def epoch = controllerContext.epoch
 
-  def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
+  def clientId = {
+    val listeners = config.listeners
+    val controllerListener = listeners.get(config.interBrokerSecurityProtocol)
+    "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.get.host, controllerListener.get.port)
+  }
 
   /**
    * On clean shutdown, the controller first determines the partitions that the

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index d281bb3..9c14428 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -18,14 +18,13 @@
 package kafka.javaapi
 
 import java.nio.ByteBuffer
-
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 
 class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
 
   def errorCode = underlying.errorCode
 
-  def coordinator: Broker = {
+  def coordinator: BrokerEndpoint = {
     import kafka.javaapi.Implicits._
     underlying.coordinatorOpt
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index f384e04..ebbd589 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -16,7 +16,7 @@
  */
 package kafka.javaapi
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import scala.collection.JavaConversions
 
 private[javaapi] object MetadataListImplicits {
@@ -52,17 +52,17 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
 class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
   def partitionId: Int = underlying.partitionId
 
-  def leader: Broker = {
+  def leader: BrokerEndpoint = {
     import kafka.javaapi.Implicits._
     underlying.leader
   }
 
-  def replicas: java.util.List[Broker] = {
+  def replicas: java.util.List[BrokerEndpoint] = {
     import JavaConversions._
     underlying.replicas
   }
 
-  def isr: java.util.List[Broker] = {
+  def isr: java.util.List[BrokerEndpoint] = {
     import JavaConversions._
     underlying.isr
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index bc73540..1d9c57b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,12 +26,13 @@ import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
 import java.net._
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
 import org.apache.log4j.Logger
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+  val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -42,7 +43,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
+  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0), securityProtocol: SecurityProtocol) {
     @volatile var requestDequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 578283b..0ad9057 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -24,6 +24,9 @@ import java.net._
 import java.io._
 import java.nio.channels._
 
+import kafka.cluster.EndPoint
+import org.apache.kafka.common.protocol.SecurityProtocol
+
 import scala.collection._
 
 import kafka.common.KafkaException
@@ -38,8 +41,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
 class SocketServer(val brokerId: Int,
-                   val host: String,
-                   private val port: Int,
+                   val endpoints: Map[SecurityProtocol, EndPoint],
                    val numProcessorThreads: Int,
                    val maxQueuedRequests: Int,
                    val sendBufferSize: Int,
@@ -51,28 +53,41 @@ class SocketServer(val brokerId: Int,
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
-  @volatile private var acceptor: Acceptor = null
+  private[network] var acceptors =  mutable.Map[EndPoint,Acceptor]()
   val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
   /* a meter to track the average free capacity of the network processors */
   private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
 
+
+  /* I'm pushing the mapping of port-to-protocol to the processor level,
+     so the processor can put the correct protocol in the request channel.
+     we'll probably have a more elegant way of doing this once we patch the request channel
+     to include more information about security and authentication.
+     TODO: re-consider this code when working on KAFKA-1683
+   */
+  private val portToProtocol: ConcurrentHashMap[Int, SecurityProtocol] = new ConcurrentHashMap[Int, SecurityProtocol]()
+
   /**
    * Start the socket server
    */
   def startup() {
     val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
-    for(i <- 0 until numProcessorThreads) {
-      processors(i) = new Processor(i, 
-                                    time, 
-                                    maxRequestSize, 
-                                    aggregateIdleMeter,
-                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
-                                    numProcessorThreads, 
-                                    requestChannel,
-                                    quotas,
-                                    connectionsMaxIdleMs)
-      Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
+
+    this.synchronized {
+      for (i <- 0 until numProcessorThreads) {
+        processors(i) = new Processor(i,
+          time,
+          maxRequestSize,
+          aggregateIdleMeter,
+          newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
+          numProcessorThreads,
+          requestChannel,
+          quotas,
+          connectionsMaxIdleMs,
+          portToProtocol)
+        Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start();
+      }
     }
 
     newGauge("ResponsesBeingSent", new Gauge[Int] {
@@ -83,10 +98,19 @@ class SocketServer(val brokerId: Int,
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
     // start accepting connections
-    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
-    Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
-    acceptor.awaitStartup
-    info("Started")
+    // right now we will use the same processors for all ports, since we didn't implement different protocols
+    // in the future, we may implement different processors for SSL and Kerberos
+
+    this.synchronized {
+      endpoints.values.foreach(endpoint => {
+        val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas, endpoint.protocolType, portToProtocol)
+        acceptors.put(endpoint, acceptor)
+        Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start()
+        acceptor.awaitStartup
+      })
+    }
+
+    info("Started " + acceptors.size + " acceptor threads")
   }
 
   /**
@@ -94,17 +118,19 @@ class SocketServer(val brokerId: Int,
    */
   def shutdown() = {
     info("Shutting down")
-    if(acceptor != null)
-      acceptor.shutdown()
-    for(processor <- processors)
-      processor.shutdown()
+    this.synchronized {
+      acceptors.values.foreach(_.shutdown)
+      processors.foreach(_.shutdown)
+    }
     info("Shutdown completed")
   }
 
-  def boundPort(): Int = {
-    if (acceptor == null)
-      throw new KafkaException("Tried to check server's port before server was started")
-    acceptor.serverChannel.socket().getLocalPort
+  def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = {
+    try {
+      acceptors(endpoints(protocol)).serverChannel.socket().getLocalPort
+    } catch {
+      case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
+    }
   }
 }
 
@@ -207,8 +233,11 @@ private[kafka] class Acceptor(val host: String,
                               private val processors: Array[Processor],
                               val sendBufferSize: Int, 
                               val recvBufferSize: Int,
-                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
+                              connectionQuotas: ConnectionQuotas,
+                              protocol: SecurityProtocol,
+                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
   val serverChannel = openServerSocket(host, port)
+  portToProtocol.put(serverChannel.socket().getLocalPort, protocol)
 
   /**
    * Accept loop that checks for new connection attempts
@@ -307,7 +336,8 @@ private[kafka] class Processor(val id: Int,
                                val totalProcessorThreads: Int,
                                val requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
-                               val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
+                               val connectionsMaxIdleMs: Long,
+                               val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000
@@ -453,7 +483,9 @@ private[kafka] class Processor(val id: Int,
     if(read < 0) {
       close(key)
     } else if(receive.complete) {
-      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
+      val port = socketChannel.socket().getLocalPort
+      val protocol = portToProtocol.get(port)
+      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol)
       requestChannel.sendRequest(req)
       key.attach(null)
       // explicitly reset interest ops to not READ, no need to wake up the selector just yet

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 43df70b..07feb05 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -17,20 +17,21 @@
 
 package kafka.producer
 
-import kafka.cluster.Broker
 import java.util.Properties
-import collection.mutable.HashMap
-import java.lang.Object
-import kafka.utils.Logging
+
 import kafka.api.TopicMetadata
+import kafka.cluster.BrokerEndpoint
 import kafka.common.UnavailableProducerException
+import kafka.utils.Logging
+
+import scala.collection.mutable.HashMap
 
 
 object ProducerPool {
   /**
    * Used in ProducerPool to initiate a SyncProducer connection with a broker.
    */
-  def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
+  def createSyncProducer(config: ProducerConfig, broker: BrokerEndpoint): SyncProducer = {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
@@ -44,11 +45,12 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
   private val lock = new Object()
 
   def updateProducer(topicMetadata: Seq[TopicMetadata]) {
-    val newBrokers = new collection.mutable.HashSet[Broker]
+    val newBrokers = new collection.mutable.HashSet[BrokerEndpoint]
     topicMetadata.foreach(tmd => {
       tmd.partitionsMetadata.foreach(pmd => {
-        if(pmd.leader.isDefined)
-          newBrokers+=(pmd.leader.get)
+        if(pmd.leader.isDefined) {
+          newBrokers += pmd.leader.get
+        }
       })
     })
     lock synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 20c00cb..94aa952 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import scala.collection.Set
 import scala.collection.Map
 import kafka.utils.{Utils, Logging}
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
@@ -68,7 +68,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
   }
 
   // to be defined in subclass to create a specific fetcher
-  def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
+  def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread
 
   def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
     mapLock synchronized {
@@ -126,6 +126,6 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
   }
 }
 
-case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
+case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int)
 
-case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
\ No newline at end of file
+case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 5d5cf58..93f67d5 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.cluster.Broker
+import kafka.cluster.BrokerEndpoint
 import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
@@ -36,7 +36,8 @@ import com.yammer.metrics.core.Gauge
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndpoint, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f372af7..b4004aa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,9 +17,9 @@
 
 package kafka.server
 
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader}
 import org.apache.kafka.common.TopicPartition
-
 import kafka.api._
 import kafka.admin.AdminUtils
 import kafka.common._
@@ -393,8 +393,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     ret.toSeq.sortBy(- _)
   }
 
-  private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
-    val topicResponses = metadataCache.getTopicMetadata(topics)
+  private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol): Seq[TopicMetadata] = {
+    val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
     if (topics.size > 0 && topicResponses.size != topics.size) {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
@@ -436,10 +436,10 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
+    val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
+    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -476,7 +476,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
 
     // get metadata (and create the topic if necessary)
-    val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName)).head
+    val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head
 
     val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
 


Mime
View raw message