From commits-return-3599-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Feb 17 10:48:26 2016 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56774181D7 for ; Wed, 17 Feb 2016 10:48:26 +0000 (UTC) Received: (qmail 44619 invoked by uid 500); 17 Feb 2016 10:48:13 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 44583 invoked by uid 500); 17 Feb 2016 10:48:13 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 44574 invoked by uid 99); 17 Feb 2016 10:48:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Feb 2016 10:48:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70DF0E0092; Wed, 17 Feb 2016 10:48:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <61120891685642cb9d17c8641acb127f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-2757; Consolidate BrokerEndPoint and EndPoint Date: Wed, 17 Feb 2016 10:48:13 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 2faf9f60c -> 3382b6db7 KAFKA-2757; Consolidate BrokerEndPoint and EndPoint Author: zhuchen1018 Reviewers: Dong Lin , Guozhang Wang Closes #911 from zhuchen1018/KAFKA-2757 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3382b6db Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3382b6db Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3382b6db Branch: refs/heads/trunk Commit: 3382b6db7b2b9c17a4ccfd9ebe840741bcf44670 Parents: 2faf9f6 Author: Chen Zhu Authored: Wed Feb 17 18:48:00 2016 +0800 Committer: Guozhang Wang Committed: Wed Feb 17 18:48:00 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kafka/common/BrokerEndPoint.java | 88 ++++++++++++++++++++ .../common/requests/LeaderAndIsrRequest.java | 31 +++---- .../common/requests/UpdateMetadataRequest.java | 18 +--- .../common/requests/RequestResponseTest.java | 13 +-- .../controller/ControllerChannelManager.scala | 6 +- .../kafka/api/AuthorizerIntegrationTest.scala | 4 +- .../unit/kafka/server/LeaderElectionTest.scala | 4 +- 7 files changed, 115 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java new file mode 100644 index 0000000..d5275c4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/BrokerEndPoint.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.io.Serializable; + +/** + * Broker id, host and port + */ +public final class BrokerEndPoint implements Serializable { + + private int hash = 0; + private final int id; + private final String host; + private final int port; + + public BrokerEndPoint(int id, String host, int port) { + this.id = id; + this.host = host; + this.port = port; + } + + public int id() { + return id; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + @Override + public int hashCode() { + if (hash != 0) + return hash; + final int prime = 31; + int result = 1; + result = prime * result + id; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + port; + this.hash = result; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BrokerEndPoint other = (BrokerEndPoint) obj; + if (id != other.id) + return false; + if (port != other.port) + return false; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + return true; + } + + @Override + public String toString() { + return "[" + id + ", " + host + ":" + port + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index a77a7cb..264af90 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.BrokerEndPoint; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -53,18 +54,6 @@ public class LeaderAndIsrRequest extends AbstractRequest { } - public static final class EndPoint { - public final int id; - public final String host; - public final int port; - - public EndPoint(int id, String host, int port) { - this.id = id; - this.host = host; - this.port = port; - } - } - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id); private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; @@ -89,10 +78,10 @@ public class LeaderAndIsrRequest extends AbstractRequest { private final int controllerId; private final int controllerEpoch; private final Map partitionStates; - private final Set liveLeaders; + private final Set liveLeaders; public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map partitionStates, - Set liveLeaders) { + Set liveLeaders) { super(new Struct(CURRENT_SCHEMA)); struct.set(CONTROLLER_ID_KEY_NAME, controllerId); struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); @@ -115,11 +104,11 @@ public class LeaderAndIsrRequest extends AbstractRequest { struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); List leadersData = new ArrayList<>(liveLeaders.size()); - for (EndPoint leader : liveLeaders) { + for (BrokerEndPoint leader : liveLeaders) { Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME); - leaderData.set(END_POINT_ID_KEY_NAME, leader.id); - leaderData.set(HOST_KEY_NAME, leader.host); - leaderData.set(PORT_KEY_NAME, leader.port); + leaderData.set(END_POINT_ID_KEY_NAME, leader.id()); + leaderData.set(HOST_KEY_NAME, leader.host()); + leaderData.set(PORT_KEY_NAME, leader.port()); leadersData.add(leaderData); } struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray()); @@ -159,13 +148,13 @@ public class LeaderAndIsrRequest extends AbstractRequest { } - Set leaders = new HashSet<>(); + Set leaders = new HashSet<>(); for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) { Struct leadersData = (Struct) leadersDataObj; int id = leadersData.getInt(END_POINT_ID_KEY_NAME); String host = leadersData.getString(HOST_KEY_NAME); int port = leadersData.getInt(PORT_KEY_NAME); - leaders.add(new EndPoint(id, host, port)); + leaders.add(new BrokerEndPoint(id, host, port)); } controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); @@ -202,7 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { return partitionStates; } - public Set liveLeaders() { + public Set liveLeaders() { return liveLeaders; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 808161c..d8d8013 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.BrokerEndPoint; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -71,19 +72,6 @@ public class UpdateMetadataRequest extends AbstractRequest { } } - @Deprecated - public static final class BrokerEndPoint { - public final int id; - public final String host; - public final int port; - - public BrokerEndPoint(int id, String host, int port) { - this.id = id; - this.host = host; - this.port = port; - } - } - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.UPDATE_METADATA_KEY.id); private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; @@ -128,8 +116,8 @@ public class UpdateMetadataRequest extends AbstractRequest { Set brokers = new HashSet<>(brokerEndPoints.size()); for (BrokerEndPoint brokerEndPoint : brokerEndPoints) { Map endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT, - new EndPoint(brokerEndPoint.host, brokerEndPoint.port)); - brokers.add(new Broker(brokerEndPoint.id, endPoints)); + new EndPoint(brokerEndPoint.host(), brokerEndPoint.port())); + brokers.add(new Broker(brokerEndPoint.id(), endPoints)); } return brokers; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index db9c81a..5fc5551 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.BrokerEndPoint; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -353,9 +354,9 @@ public class RequestResponseTest { partitionStates.put(new TopicPartition("topic20", 1), new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); - Set leaders = new HashSet<>(Arrays.asList( - new LeaderAndIsrRequest.EndPoint(0, "test0", 1223), - new LeaderAndIsrRequest.EndPoint(1, "test1", 1223) + Set leaders = new HashSet<>(Arrays.asList( + new BrokerEndPoint(0, "test0", 1223), + new BrokerEndPoint(1, "test1", 1223) )); return new LeaderAndIsrRequest(1, 10, partitionStates, leaders); @@ -379,9 +380,9 @@ public class RequestResponseTest { new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); if (version == 0) { - Set liveBrokers = new HashSet<>(Arrays.asList( - new UpdateMetadataRequest.BrokerEndPoint(0, "host1", 1223), - new UpdateMetadataRequest.BrokerEndPoint(1, "host2", 1234) + Set liveBrokers = new HashSet<>(Arrays.asList( + new BrokerEndPoint(0, "host1", 1223), + new BrokerEndPoint(1, "host2", 1234) )); return new UpdateMetadataRequest(1, 10, liveBrokers, partitionStates); http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e52a9d3..02ba814 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -19,7 +19,7 @@ package kafka.controller import kafka.api.{LeaderAndIsr, KAFKA_090, PartitionStateInfo} import kafka.utils._ import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient} -import org.apache.kafka.common.{TopicPartition, Node} +import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode} import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys} @@ -352,7 +352,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { b => val brokerEndPoint = b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol) - new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch @@ -386,7 +386,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging if (version == 0) { val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker => val brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT) - new UpdateMetadataRequest.BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index db2040f..33027e7 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.{TopicPartition, requests} +import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { private def createLeaderAndIsrRequest = { new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue, Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, - Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava) + Set(new BrokerEndPoint(brokerId,"localhost", 0)).asJava) } private def createStopReplicaRequest = { http://git-wip-us.apache.org/repos/asf/kafka/blob/3382b6db/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 704f776..94013bc 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -17,7 +17,7 @@ package kafka.server -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{BrokerEndPoint, TopicPartition} import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState import scala.collection.JavaConverters._ @@ -132,7 +132,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())) val brokerEndPoints = brokers.map { b => val brokerEndPoint = b.getBrokerEndPoint(SecurityProtocol.PLAINTEXT) - new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) + new BrokerEndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } val controllerContext = new ControllerContext(zkUtils, 6000)