kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2099; BrokerEndPoint file, methods and object names should match; patched by Gwen Shapira; reviewed by Sriharsha Chintalapani and Jun Rao
Date Tue, 07 Apr 2015 01:20:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 75e1cc8bc -> 04fce48e1


kafka-2099; BrokerEndPoint file, methods and object names should match; patched by Gwen Shapira;
reviewed by Sriharsha Chintalapani and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04fce48e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04fce48e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04fce48e

Branch: refs/heads/trunk
Commit: 04fce48e1af183d5133be1044d27e0f526e94f2a
Parents: 75e1cc8
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Mon Apr 6 18:20:45 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Apr 6 18:20:45 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala        |  8 ++++----
 .../main/scala/kafka/api/ConsumerMetadataResponse.scala |  8 ++++----
 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala | 10 +++++-----
 core/src/main/scala/kafka/api/TopicMetadata.scala       | 12 ++++++------
 .../main/scala/kafka/api/TopicMetadataResponse.scala    |  6 +++---
 .../main/scala/kafka/api/UpdateMetadataRequest.scala    |  4 ++--
 core/src/main/scala/kafka/client/ClientUtils.scala      | 10 +++++-----
 core/src/main/scala/kafka/cluster/Broker.scala          |  6 +++---
 core/src/main/scala/kafka/cluster/BrokerEndPoint.scala  | 12 ++++++------
 .../scala/kafka/consumer/ConsumerFetcherManager.scala   |  6 +++---
 .../scala/kafka/consumer/ConsumerFetcherThread.scala    |  4 ++--
 .../scala/kafka/javaapi/ConsumerMetadataResponse.scala  |  4 ++--
 core/src/main/scala/kafka/javaapi/TopicMetadata.scala   |  8 ++++----
 core/src/main/scala/kafka/producer/ProducerPool.scala   |  6 +++---
 .../scala/kafka/server/AbstractFetcherManager.scala     |  8 ++++----
 .../main/scala/kafka/server/AbstractFetcherThread.scala |  4 ++--
 core/src/main/scala/kafka/server/MetadataCache.scala    |  8 ++++----
 .../main/scala/kafka/server/ReplicaFetcherManager.scala |  4 ++--
 .../main/scala/kafka/server/ReplicaFetcherThread.scala  |  4 ++--
 core/src/main/scala/kafka/server/ReplicaManager.scala   |  4 ++--
 .../scala/kafka/tools/ReplicaVerificationTool.scala     |  6 +++---
 .../main/scala/kafka/tools/SimpleConsumerShell.scala    |  6 +++---
 core/src/main/scala/kafka/utils/ZkUtils.scala           |  2 +-
 .../kafka/api/RequestResponseSerializationTest.scala    |  4 ++--
 .../scala/unit/kafka/cluster/BrokerEndPointTest.scala   |  4 ++--
 .../unit/kafka/integration/TopicMetadataTest.scala      |  4 ++--
 .../scala/unit/kafka/producer/AsyncProducerTest.scala   |  8 ++++----
 27 files changed, 85 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 0d3332e..eee80f9 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import kafka.common._
-import kafka.cluster.{BrokerEndpoint, Broker}
+import kafka.cluster.{BrokerEndPoint, Broker}
 
 import kafka.log.LogConfig
 import kafka.utils._
@@ -356,9 +356,9 @@ object AdminUtils extends Logging {
         val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
         debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader
= " + leader)
 
-        var leaderInfo: Option[BrokerEndpoint] = None
-        var replicaInfo: Seq[BrokerEndpoint] = Nil
-        var isrInfo: Seq[BrokerEndpoint] = Nil
+        var leaderInfo: Option[BrokerEndPoint] = None
+        var replicaInfo: Seq[BrokerEndPoint] = Nil
+        var isrInfo: Seq[BrokerEndPoint] = Nil
         try {
           leaderInfo = leader match {
             case Some(l) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index d2a3d43..ea1c0d0 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -18,18 +18,18 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.common.ErrorMapping
 
 object ConsumerMetadataResponse {
   val CurrentVersion = 0
 
-  private val NoBrokerEndpointOpt = Some(BrokerEndpoint(id = -1, host = "", port = -1))
+  private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
   
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
     val errorCode = buffer.getShort
-    val broker = BrokerEndpoint.readFrom(buffer)
+    val broker = BrokerEndPoint.readFrom(buffer)
     val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
       Some(broker)
     else
@@ -40,7 +40,7 @@ object ConsumerMetadataResponse {
   
 }
 
-case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndpoint], errorCode: Short,
correlationId: Int)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short,
correlationId: Int)
   extends RequestOrResponse() {
 
   def sizeInBytes =

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index bf93632..2fad585 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -21,7 +21,7 @@ package kafka.api
 import java.nio._
 import kafka.utils._
 import kafka.api.ApiUtils._
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
@@ -120,9 +120,9 @@ object LeaderAndIsrRequest {
     }
 
     val leadersCount = buffer.getInt
-    var leaders = Set[BrokerEndpoint]()
+    var leaders = Set[BrokerEndPoint]()
     for (i <- 0 until leadersCount)
-      leaders += BrokerEndpoint.readFrom(buffer)
+      leaders += BrokerEndPoint.readFrom(buffer)
 
     new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
partitionStateInfos.toMap, leaders)
   }
@@ -134,10 +134,10 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[BrokerEndpoint])
+                                leaders: Set[BrokerEndPoint])
     extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndpoint],
controllerId: Int,
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndPoint],
controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
          controllerId, controllerEpoch, partitionStateInfos, leaders)

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 6de447d..5e39f45 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
@@ -27,7 +27,7 @@ object TopicMetadata {
   
   val NoLeaderNodeId = -1
 
-  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): TopicMetadata = {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): TopicMetadata = {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
@@ -88,7 +88,7 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
 
 object PartitionMetadata {
 
-  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndpoint]): PartitionMetadata
= {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, BrokerEndPoint]): PartitionMetadata
= {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition
id */
     val leaderId = buffer.getInt
@@ -109,9 +109,9 @@ object PartitionMetadata {
 }
 
 case class PartitionMetadata(partitionId: Int, 
-                             val leader: Option[BrokerEndpoint],
-                             replicas: Seq[BrokerEndpoint],
-                             isr: Seq[BrokerEndpoint] = Seq.empty,
+                             val leader: Option[BrokerEndPoint],
+                             replicas: Seq[BrokerEndPoint],
+                             isr: Seq[BrokerEndPoint] = Seq.empty,
                              errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
     2 /* error code */ + 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 776b604..f2f89e0 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import java.nio.ByteBuffer
 
 object TopicMetadataResponse {
@@ -25,7 +25,7 @@ object TopicMetadataResponse {
   def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
     val correlationId = buffer.getInt
     val brokerCount = buffer.getInt
-    val brokers = (0 until brokerCount).map(_ => BrokerEndpoint.readFrom(buffer))
+    val brokers = (0 until brokerCount).map(_ => BrokerEndPoint.readFrom(buffer))
     val brokerMap = brokers.map(b => (b.id, b)).toMap
     val topicCount = buffer.getInt
     val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer,
brokerMap))
@@ -33,7 +33,7 @@ object TopicMetadataResponse {
   }
 }
 
-case class TopicMetadataResponse(brokers: Seq[BrokerEndpoint],
+case class TopicMetadataResponse(brokers: Seq[BrokerEndPoint],
                                  topicsMetadata: Seq[TopicMetadata],
                                  correlationId: Int)
     extends RequestOrResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index a91e3a6..69f0397 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -19,7 +19,7 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.api.ApiUtils._
-import kafka.cluster.{Broker, BrokerEndpoint}
+import kafka.cluster.{Broker, BrokerEndPoint}
 import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
 import kafka.network.RequestChannel.Response
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
@@ -53,7 +53,7 @@ object UpdateMetadataRequest {
     val numAliveBrokers = buffer.getInt
 
     val aliveBrokers = versionId match {
-      case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndpoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
+      case 0 => for(i <- 0 until numAliveBrokers) yield new Broker(BrokerEndPoint.readFrom(buffer),SecurityProtocol.PLAINTEXT)
       case 1 => for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
       case v => throw new KafkaException( "Version " + v.toString + " is invalid for UpdateMetadataRequest.
Valid versions are 0 or 1.")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index f08aaf2..b66424b 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -43,7 +43,7 @@ object ClientUtils extends Logging{
    * @param producerConfig The producer's config
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], producerConfig:
ProducerConfig, correlationId: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig:
ProducerConfig, correlationId: Int): TopicMetadataResponse = {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
correlationId, producerConfig.clientId, topics.toSeq)
@@ -84,7 +84,7 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndpoint], clientId: String,
timeoutMs: Int,
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], clientId: String,
timeoutMs: Int,
                          correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
     props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
@@ -97,11 +97,11 @@ object ClientUtils extends Logging{
   /**
    * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
    */
-  def parseBrokerList(brokerListStr: String): Seq[BrokerEndpoint] = {
+  def parseBrokerList(brokerListStr: String): Seq[BrokerEndPoint] = {
     val brokersStr = CoreUtils.parseCsvList(brokerListStr)
 
     brokersStr.zipWithIndex.map { case (address, brokerId) =>
-      BrokerEndpoint.createBrokerEndPoint(brokerId, address)
+      BrokerEndPoint.createBrokerEndPoint(brokerId, address)
     }
   }
 
@@ -144,7 +144,7 @@ object ClientUtils extends Logging{
 
      while (!offsetManagerChannelOpt.isDefined) {
 
-       var coordinatorOpt: Option[BrokerEndpoint] = None
+       var coordinatorOpt: Option[BrokerEndPoint] = None
 
        while (!coordinatorOpt.isDefined) {
          try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 8e603b6..79e16c1 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -110,7 +110,7 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint])
{
     this(id, Map(protocol -> EndPoint(host, port, protocol)))
   }
 
-  def this(bep: BrokerEndpoint, protocol: SecurityProtocol) = {
+  def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = {
     this(bep.id, bep.host, bep.port, protocol)
   }
 
@@ -132,10 +132,10 @@ case class Broker(id: Int, endPoints: Map[SecurityProtocol, EndPoint])
{
     endPoints.contains(protocolType)
   }
 
-  def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndpoint = {
+  def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
     val endpoint = endPoints.get(protocolType)
     endpoint match {
-      case Some(endpoint) => new BrokerEndpoint(id, endpoint.host, endpoint.port)
+      case Some(endpoint) => new BrokerEndPoint(id, endpoint.host, endpoint.port)
       case None =>
         throw new BrokerEndPointNotAvailableException("End point %s not found for broker
%d".format(protocolType,id))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 22dba18..3395108 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -22,24 +22,24 @@ import kafka.api.ApiUtils._
 import kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils._
 
-object BrokerEndpoint {
-  def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndpoint = {
+object BrokerEndPoint {
+  def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndPoint = {
 
     // BrokerEndPoint URI is host:port or [ipv6_host]:port
     // Note that unlike EndPoint (or listener) this URI has no security information.
     val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
 
     connectionString match {
-      case uriParseExp(host, port) => new BrokerEndpoint(brokerId, host, port.toInt)
+      case uriParseExp(host, port) => new BrokerEndPoint(brokerId, host, port.toInt)
       case _ => throw new KafkaException("Unable to parse " + connectionString + " to
a broker endpoint")
     }
   }
 
-  def readFrom(buffer: ByteBuffer): BrokerEndpoint = {
+  def readFrom(buffer: ByteBuffer): BrokerEndPoint = {
     val brokerId = buffer.getInt()
     val host = readShortString(buffer)
     val port = buffer.getInt()
-    BrokerEndpoint(brokerId, host, port)
+    BrokerEndPoint(brokerId, host, port)
   }
 }
 
@@ -50,7 +50,7 @@ object BrokerEndpoint {
  * Clients should know which security protocol to use from configuration.
  * This allows us to keep the wire protocol with the clients unchanged where the protocol
is not needed.
  */
-case class BrokerEndpoint(id: Int, host: String, port: Int) {
+case class BrokerEndPoint(id: Int, host: String, port: Int) {
 
   def connectionString(): String = formatAddress(host, port)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 6bb0d56..49b683f 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -19,7 +19,7 @@ package kafka.consumer
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
-import kafka.cluster.{BrokerEndpoint, Cluster}
+import kafka.cluster.{BrokerEndPoint, Cluster}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection.immutable
 import collection.mutable.HashMap
@@ -53,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
     override def doWork() {
-      val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndpoint]
+      val leaderForPartitionsMap = new HashMap[TopicAndPartition, BrokerEndPoint]
       lock.lock()
       try {
         while (noLeaderPartitionSet.isEmpty) {
@@ -114,7 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
     }
   }
 
-  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread
= {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
= {
     new ConsumerFetcherThread(
       "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
       config, sourceBroker, partitionMap, this)

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index cde4481..33ea728 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
@@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
-                            sourceBroker: BrokerEndpoint,
+                            sourceBroker: BrokerEndPoint,
                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name,

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index 9c14428..4345a8e 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -18,13 +18,13 @@
 package kafka.javaapi
 
 import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 
 class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse)
{
 
   def errorCode = underlying.errorCode
 
-  def coordinator: BrokerEndpoint = {
+  def coordinator: BrokerEndPoint = {
     import kafka.javaapi.Implicits._
     underlying.coordinatorOpt
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index ebbd589..4ef8321 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -16,7 +16,7 @@
  */
 package kafka.javaapi
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import scala.collection.JavaConversions
 
 private[javaapi] object MetadataListImplicits {
@@ -52,17 +52,17 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
 class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
   def partitionId: Int = underlying.partitionId
 
-  def leader: BrokerEndpoint = {
+  def leader: BrokerEndPoint = {
     import kafka.javaapi.Implicits._
     underlying.leader
   }
 
-  def replicas: java.util.List[BrokerEndpoint] = {
+  def replicas: java.util.List[BrokerEndPoint] = {
     import JavaConversions._
     underlying.replicas
   }
 
-  def isr: java.util.List[BrokerEndpoint] = {
+  def isr: java.util.List[BrokerEndPoint] = {
     import JavaConversions._
     underlying.isr
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 07feb05..5ad6812 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -20,7 +20,7 @@ package kafka.producer
 import java.util.Properties
 
 import kafka.api.TopicMetadata
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.common.UnavailableProducerException
 import kafka.utils.Logging
 
@@ -31,7 +31,7 @@ object ProducerPool {
   /**
    * Used in ProducerPool to initiate a SyncProducer connection with a broker.
    */
-  def createSyncProducer(config: ProducerConfig, broker: BrokerEndpoint): SyncProducer =
{
+  def createSyncProducer(config: ProducerConfig, broker: BrokerEndPoint): SyncProducer =
{
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
@@ -45,7 +45,7 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
   private val lock = new Object()
 
   def updateProducer(topicMetadata: Seq[TopicMetadata]) {
-    val newBrokers = new collection.mutable.HashSet[BrokerEndpoint]
+    val newBrokers = new collection.mutable.HashSet[BrokerEndPoint]
     topicMetadata.foreach(tmd => {
       tmd.partitionsMetadata.foreach(pmd => {
         if(pmd.leader.isDefined) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index f8f9331..ec40516 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import scala.collection.Set
 import scala.collection.Map
 import kafka.utils.Logging
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
@@ -69,7 +69,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId:
Stri
   }
 
   // to be defined in subclass to create a specific fetcher
-  def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread
+  def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
 
   def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset])
{
     mapLock synchronized {
@@ -127,6 +127,6 @@ abstract class AbstractFetcherManager(protected val name: String, clientId:
Stri
   }
 }
 
-case class BrokerAndFetcherId(broker: BrokerEndpoint, fetcherId: Int)
+case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
 
-case class BrokerAndInitialOffset(broker: BrokerEndpoint, initOffset: Long)
+case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1e26de2..f178527 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
@@ -37,7 +37,7 @@ import com.yammer.metrics.core.Gauge
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
 
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndpoint,
socketTimeout: Int, socketBufferSize: Int,
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint,
socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4460b42..9a9205f 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.cluster.{BrokerEndpoint,Broker}
+import kafka.cluster.{BrokerEndPoint,Broker}
 import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
 import kafka.common.TopicAndPartition
 
@@ -54,10 +54,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
           val partitionMetadata = partitionStateInfos.map {
             case (partitionId, partitionState) =>
               val replicas = partitionState.allReplicas
-              val replicaInfo: Seq[BrokerEndpoint] = replicas.map(aliveBrokers.getOrElse(_,
null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
-              var leaderInfo: Option[BrokerEndpoint] = None
+              val replicaInfo: Seq[BrokerEndPoint] = replicas.map(aliveBrokers.getOrElse(_,
null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
+              var leaderInfo: Option[BrokerEndPoint] = None
               var leaderBrokerInfo: Option[Broker] = None
-              var isrInfo: Seq[BrokerEndpoint] = Nil
+              var isrInfo: Seq[BrokerEndPoint] = Nil
               val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
               val leader = leaderIsrAndEpoch.leaderAndIsr.leader
               val isr = leaderIsrAndEpoch.leaderAndIsr.isr

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index f0a2a5b..ef38ed3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -17,13 +17,13 @@
 
 package kafka.server
 
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr:
ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
                                        "Replica", brokerConfig.numReplicaFetchers) {
 
-  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread
= {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
= {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id),
sourceBroker, brokerConfig, replicaMgr)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index b2196c8..2d84afa 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -18,14 +18,14 @@
 package kafka.server
 
 import kafka.admin.AdminUtils
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{OffsetRequest, FetchResponsePartitionData}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
-                           sourceBroker: BrokerEndpoint,
+                           sourceBroker: BrokerEndPoint,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 144a15e..b06f00b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.api._
 import kafka.common._
 import kafka.utils._
-import kafka.cluster.{BrokerEndpoint, Partition, Replica}
+import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.{LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
@@ -684,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition
caused it
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition,
PartitionStateInfo],
-                            leaders: Set[BrokerEndpoint], correlationId: Int, responseMap:
mutable.Map[(String, Int), Short],
+                            leaders: Set[BrokerEndPoint], correlationId: Int, responseMap:
mutable.Map[(String, Int), Short],
                             offsetManager: OffsetManager) {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index d1050b4..1366172 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import joptsimple.OptionParser
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicReference
@@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset:
Long, c
 private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition,
Int],
                             leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
                             expectedNumFetchers: Int,
-                            brokerMap: Map[Int, BrokerEndpoint],
+                            brokerMap: Map[Int, BrokerEndPoint],
                             initialOffsetTime: Long,
                             reportInterval: Long) extends Logging {
   private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
@@ -335,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
   }
 }
 
-private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, topicAndPartitions:
Iterable[TopicAndPartition],
+private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAndPartitions:
Iterable[TopicAndPartition],
                              replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize:
Int,
                              fetchSize: Int, maxWait: Int, minBytes: Int, doVerification:
Boolean)
   extends ShutdownableThread(name) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 9a6804c..dec9516 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -22,7 +22,7 @@ import kafka.utils._
 import kafka.consumer._
 import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
-import kafka.cluster.BrokerEndpoint
+import kafka.cluster.BrokerEndPoint
 import scala.collection.JavaConversions._
 import kafka.common.TopicAndPartition
 import org.apache.kafka.common.utils.Utils
@@ -143,8 +143,8 @@ object SimpleConsumerShell extends Logging {
     }
 
     // validating replica id and initializing target broker
-    var fetchTargetBroker: BrokerEndpoint = null
-    var replicaOpt: Option[BrokerEndpoint] = null
+    var fetchTargetBroker: BrokerEndPoint = null
+    var replicaOpt: Option[BrokerEndPoint] = null
     if(replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
       if(!replicaOpt.isDefined) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 82b0e33..b03172a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -83,7 +83,7 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol):
Seq[BrokerEndpoint] = {
+  def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol):
Seq[BrokerEndPoint] = {
     getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 83910f3..566b538 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -18,7 +18,7 @@
 package kafka.api
 
 
-import kafka.cluster.{BrokerEndpoint, EndPoint, Broker}
+import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
 import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
 import kafka.common._
 import kafka.message.{Message, ByteBufferMessageSet}
@@ -122,7 +122,7 @@ object SerializationTestUtils {
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2,
2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndpoint](), 0, 1,
0, "")
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndPoint](), 0, 1,
0, "")
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index ad58eed..bb2506c 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -91,12 +91,12 @@ class BrokerEndPointTest extends JUnit3Suite with Logging {
   @Test
   def testBrokerEndpointFromURI() = {
     var connectionString = "localhost:9092"
-    var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+    var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
     assert(endpoint.host == "localhost")
     assert(endpoint.port == 9092)
     // also test for ipv6
     connectionString = "[::1]:9092"
-    endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString)
+    endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
     assert(endpoint.host == "::1")
     assert(endpoint.port == 9092)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 603cf76..995b059 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -23,7 +23,7 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.AdminUtils
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import kafka.cluster.{BrokerEndpoint, Broker}
+import kafka.cluster.{BrokerEndPoint, Broker}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{KafkaServer, KafkaConfig}
@@ -33,7 +33,7 @@ import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   private var server1: KafkaServer = null
-  var brokerEndPoints: Seq[BrokerEndpoint] = null
+  var brokerEndPoints: Seq[BrokerEndPoint] = null
 
   override def setUp() {
     super.setUp()

http://git-wip-us.apache.org/repos/asf/kafka/blob/04fce48e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 2169a5c..be4bb87 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -23,7 +23,7 @@ import junit.framework.Assert._
 import org.easymock.EasyMock
 import org.junit.Test
 import kafka.api._
-import kafka.cluster.{BrokerEndpoint, Broker}
+import kafka.cluster.{BrokerEndPoint, Broker}
 import kafka.common._
 import kafka.message._
 import kafka.producer.async._
@@ -165,8 +165,8 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
-    val broker1 = new BrokerEndpoint(0, "localhost", 9092)
-    val broker2 = new BrokerEndpoint(1, "localhost", 9093)
+    val broker1 = new BrokerEndPoint(0, "localhost", 9092)
+    val broker2 = new BrokerEndPoint(1, "localhost", 9093)
 
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -469,7 +469,7 @@ class AsyncProducerTest extends JUnit3Suite {
   }
 
   private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost:
String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort)
+    val broker1 = new BrokerEndPoint(brokerId, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
   


Mime
View raw message