kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3431: Remove `o.a.k.common.BrokerEndPoint` in favour of `Node`
Date Wed, 23 Mar 2016 02:13:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 d1e74890c -> aead28a54


KAFKA-3431: Remove `o.a.k.common.BrokerEndPoint` in favour of `Node`

Also included a minor efficiency improvement in kafka.cluster.EndPoint.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1105 from ijuma/kafka-3431-replace-broker-end-point-with-node

(cherry picked from commit 255b5e13863a95cfc327236856db2df188f04d49)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aead28a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aead28a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aead28a5

Branch: refs/heads/0.10.0
Commit: aead28a54e55ca0541d696abced523b453bb50f7
Parents: d1e7489
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Mar 22 19:13:26 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Mar 22 19:13:35 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/BrokerEndPoint.java | 88 --------------------
 .../common/requests/LeaderAndIsrRequest.java    | 14 ++--
 .../common/requests/UpdateMetadataRequest.java  |  8 +-
 .../common/requests/RequestResponseTest.java    | 13 ++-
 .../src/main/scala/kafka/cluster/EndPoint.scala |  3 +-
 .../controller/ControllerChannelManager.scala   | 12 +--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  8 +-
 .../unit/kafka/server/LeaderElectionTest.scala  | 17 ++--
 .../unit/kafka/server/ReplicaManagerTest.scala  |  6 +-
 9 files changed, 38 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
deleted file mode 100644
index d5275c4..0000000
--- a/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common;
-
-import java.io.Serializable;
-
-/**
- * Broker id, host and port
- */
-public final class BrokerEndPoint implements Serializable {
-
-    private int hash = 0;
-    private final int id;
-    private final String host;
-    private final int port;
-
-    public BrokerEndPoint(int id, String host, int port) {
-        this.id = id;
-        this.host = host;
-        this.port = port;
-    }
-
-    public int id() {
-        return id;
-    }
-
-    public String host() {
-        return host;
-    }
-
-    public int port() {
-        return port;
-    }
-
-    @Override
-    public int hashCode() {
-        if (hash != 0)
-            return hash;
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + id;
-        result = prime * result + ((host == null) ? 0 : host.hashCode());
-        result = prime * result + port;
-        this.hash = result;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        BrokerEndPoint other = (BrokerEndPoint) obj;
-        if (id != other.id)
-            return false;
-        if (port != other.port)
-            return false;
-        if (host == null) {
-            if (other.host != null)
-                return false;
-        } else if (!host.equals(other.host))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + id + ", " + host + ":" + port + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 264af90..fee3c21 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.BrokerEndPoint;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -78,10 +78,10 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private final int controllerId;
     private final int controllerEpoch;
     private final Map<TopicPartition, PartitionState> partitionStates;
-    private final Set<BrokerEndPoint> liveLeaders;
+    private final Set<Node> liveLeaders;
 
     public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates,
-                               Set<BrokerEndPoint> liveLeaders) {
+                               Set<Node> liveLeaders) {
         super(new Struct(CURRENT_SCHEMA));
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
@@ -104,7 +104,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
 
         List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
-        for (BrokerEndPoint leader : liveLeaders) {
+        for (Node leader : liveLeaders) {
             Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
             leaderData.set(END_POINT_ID_KEY_NAME, leader.id());
             leaderData.set(HOST_KEY_NAME, leader.host());
@@ -148,13 +148,13 @@ public class LeaderAndIsrRequest extends AbstractRequest {
 
         }
 
-        Set<BrokerEndPoint> leaders = new HashSet<>();
+        Set<Node> leaders = new HashSet<>();
         for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) {
             Struct leadersData = (Struct) leadersDataObj;
             int id = leadersData.getInt(END_POINT_ID_KEY_NAME);
             String host = leadersData.getString(HOST_KEY_NAME);
             int port = leadersData.getInt(PORT_KEY_NAME);
-            leaders.add(new BrokerEndPoint(id, host, port));
+            leaders.add(new Node(id, host, port));
         }
 
         controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
@@ -191,7 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         return partitionStates;
     }
 
-    public Set<BrokerEndPoint> liveLeaders() {
+    public Set<Node> liveLeaders() {
         return liveLeaders;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 4c3d0a7..27f89fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -13,7 +13,7 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.BrokerEndPoint;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -113,15 +113,15 @@ public class UpdateMetadataRequest extends AbstractRequest {
      * Constructor for version 0.
      */
     @Deprecated
-    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Set<BrokerEndPoint>
liveBrokers,
+    public UpdateMetadataRequest(int controllerId, int controllerEpoch, Set<Node> liveBrokers,
                                  Map<TopicPartition, PartitionState> partitionStates)
{
         this(0, controllerId, controllerEpoch, partitionStates,
              brokerEndPointsToBrokers(liveBrokers));
     }
 
-    private static Set<Broker> brokerEndPointsToBrokers(Set<BrokerEndPoint> brokerEndPoints)
{
+    private static Set<Broker> brokerEndPointsToBrokers(Set<Node> brokerEndPoints)
{
         Set<Broker> brokers = new HashSet<>(brokerEndPoints.size());
-        for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
+        for (Node brokerEndPoint : brokerEndPoints) {
             Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
                     new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
             brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a4c5238..9def557 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -13,7 +13,6 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.BrokerEndPoint;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownServerException;
@@ -374,9 +373,9 @@ public class RequestResponseTest {
         partitionStates.put(new TopicPartition("topic20", 1),
                 new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
 
-        Set<BrokerEndPoint> leaders = new HashSet<>(Arrays.asList(
-                new BrokerEndPoint(0, "test0", 1223),
-                new BrokerEndPoint(1, "test1", 1223)
+        Set<Node> leaders = new HashSet<>(Arrays.asList(
+                new Node(0, "test0", 1223),
+                new Node(1, "test1", 1223)
         ));
 
         return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
@@ -401,9 +400,9 @@ public class RequestResponseTest {
                 new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
 
         if (version == 0) {
-            Set<BrokerEndPoint> liveBrokers = new HashSet<>(Arrays.asList(
-                    new BrokerEndPoint(0, "host1", 1223),
-                    new BrokerEndPoint(1, "host2", 1234)
+            Set<Node> liveBrokers = new HashSet<>(Arrays.asList(
+                    new Node(0, "host1", 1223),
+                    new Node(1, "host2", 1234)
             ));
 
             return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates);

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 32c27ed..3d24862 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -26,6 +26,8 @@ import org.apache.kafka.common.utils.Utils
 
 object EndPoint {
 
+  private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-.:]*)\]?:(-?[0-9]+)""".r
+
   def readFrom(buffer: ByteBuffer): EndPoint = {
     val port = buffer.getInt()
     val host = readShortString(buffer)
@@ -42,7 +44,6 @@ object EndPoint {
    * @return
    */
   def createEndPoint(connectionString: String): EndPoint = {
-    val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-.:]*)\]?:(-?[0-9]+)""".r
     connectionString match {
       case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, SecurityProtocol.forName(protocol))
       case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, SecurityProtocol.forName(protocol))

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ea156fa..b376d15 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode,
Networ
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{BrokerEndPoint, Node, TopicPartition}
+import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Set, mutable}
@@ -351,9 +351,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
                                                                    topicPartition.topic,
topicPartition.partition))
         }
         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map
{ b =>
-          val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)
-          new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
+        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map
{
+          _.getNode(controller.config.interBrokerSecurityProtocol)
         }
         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo)
=>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -387,10 +386,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
 
         val updateMetadataRequest =
           if (version == 0) {
-            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
-              val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
-              new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
-            }
+            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
             new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava,
partitionStates.asJava)
           }
           else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fad7657..bc705f1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -17,6 +17,7 @@ import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
+
 import kafka.cluster.EndPoint
 import kafka.common.TopicAndPartition
 import kafka.coordinator.GroupCoordinator
@@ -24,15 +25,16 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
+import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -214,7 +216,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   private def createLeaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
       Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId,
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
-      Set(new BrokerEndPoint(brokerId,"localhost", 0)).asJava)
+      Set(new Node(brokerId, "localhost", 0)).asJava)
   }
 
   private def createStopReplicaRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 94013bc..e84780a 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,22 +17,22 @@
 
 package kafka.server
 
-import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 
 import scala.collection.JavaConverters._
 import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.requests.{LeaderAndIsrResponse, LeaderAndIsrRequest, AbstractRequestResponse}
+import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
 import org.junit.Assert._
-import kafka.utils.{TestUtils, CoreUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.cluster.Broker
 import kafka.controller.{ControllerChannelManager, ControllerContext}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.utils.SystemTime
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
 
 class LeaderElectionTest extends ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -130,10 +130,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
     val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId,
zkConnect))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))
-    val brokerEndPoints = brokers.map { b =>
-      val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)
-      new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
-    }
+    val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
 
     val controllerContext = new ControllerContext(zkUtils, 6000)
     controllerContext.liveBrokers = brokers.toSet
@@ -148,7 +145,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
           Set(0, 1).map(Integer.valueOf).asJava)
       )
       val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch,
partitionStates.asJava,
-        brokerEndPoints.toSet.asJava)
+        nodes.toSet.asJava)
 
       controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
         staleControllerEpochCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aead28a5/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a5a8df1..ee14af4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
-import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
+import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
@@ -162,7 +162,7 @@ class ReplicaManagerTest {
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
-        Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava)
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
       rm.getLeaderReplicaIfLocal(topic, 0)
 
@@ -185,7 +185,7 @@ class ReplicaManagerTest {
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
1, 1, brokerList, 0, brokerSet)).asJava,
-        Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava)
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
 
       assertTrue(produceCallbackFired)


Mime
View raw message