Updated Branches:
refs/heads/trunk bf4dbd5ee -> a55ec0620
kafka-1092; Add server config parameter to separate bind address and ZK hostname; patched
by Roger Hoover; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a55ec062
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a55ec062
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a55ec062
Branch: refs/heads/trunk
Commit: a55ec0620f6ce805fafe2e1d4035ec3e0ab4e0d0
Parents: bf4dbd5
Author: Roger Hoover <roger.hoover@gmail.com>
Authored: Wed Oct 30 21:06:23 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 30 21:06:23 2013 -0700
----------------------------------------------------------------------
config/server.properties | 13 +++--
.../main/scala/kafka/server/KafkaConfig.scala | 13 ++++-
.../scala/kafka/server/KafkaHealthcheck.scala | 14 +++---
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../unit/kafka/producer/ProducerTest.scala | 12 ++---
.../unit/kafka/server/AdvertiseBrokerTest.scala | 52 ++++++++++++++++++++
.../unit/kafka/server/KafkaConfigTest.scala | 34 ++++++++++++-
7 files changed, 118 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 2eccc5e..8efa83f 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -24,11 +24,18 @@ broker.id=0
# The port the socket server listens on
port=9092
-# Hostname the broker will bind to and advertise to producers and consumers.
-# If not set, the server will bind to all interfaces and advertise the value returned from
-# from java.net.InetAddress.getCanonicalHostName().
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
# The number of threads handling network requests
num.network.threads=2
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 74442b6..b324344 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -69,8 +69,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val port: Int = props.getInt("port", 6667)
/* hostname of broker. If this is set, it will only bind to this address. If this is not
set,
- * it will bind to all interfaces, and publish one to ZK */
+ * it will bind to all interfaces */
val hostName: String = props.getString("host.name", null)
+
+ /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may
+ * need to be different from the interface to which the broker binds. If this is not set,
+ * it will use the value for "host.name" if configured. Otherwise
+ * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */
+ val advertisedHostName: String = props.getString("advertised.host.name", hostName)
+
+ /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may
+ * need to be different from the port to which the broker binds. If this is not set,
+ * it will publish the same port that the broker binds to. */
+ val advertisedPort: Int = props.getInt("advertised.port", port)
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 84ea17a..9dca55c 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -27,14 +27,14 @@ import java.net.InetAddress
/**
* This class registers the broker in zookeeper to allow
* other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
- * /brokers/[0...N] --> host:port
+ * /brokers/[0...N] --> advertisedHost:advertisedPort
*
* Right now our definition of health is fairly naive. If we register in zk we are healthy,
otherwise
* we are dead.
*/
class KafkaHealthcheck(private val brokerId: Int,
- private val host: String,
- private val port: Int,
+ private val advertisedHost: String,
+ private val advertisedPort: Int,
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
@@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int,
* Register this broker as "alive" in zookeeper
*/
def register() {
- val hostName =
- if(host == null || host.trim.isEmpty)
+ val advertisedHostName =
+ if(advertisedHost == null || advertisedHost.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName
else
- host
+ advertisedHost
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
- ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort)
+ ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs,
jmxPort)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5e35a89..5e34f95 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
topicConfigManager.startup()
/* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port,
config.zkSessionTimeoutMs, zkClient)
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort,
config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 2fb059b..4b2e4ad 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -49,15 +49,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
private var servers = List.empty[KafkaServer]
private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- private val config1 = new KafkaConfig(props1) {
- override val hostName = "localhost"
- override val numPartitions = 4
- }
+ props1.put("num.partitions", "4")
+ private val config1 = new KafkaConfig(props1)
private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- private val config2 = new KafkaConfig(props2) {
- override val hostName = "localhost"
- override val numPartitions = 4
- }
+ props2.put("num.partitions", "4")
+ private val config2 = new KafkaConfig(props2)
override def setUp() {
super.setUp()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
new file mode 100644
index 0000000..f0c4a56
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+
+class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
+ var server : KafkaServer = null
+ val brokerId = 0
+ val advertisedHostName = "routable-host"
+ val advertisedPort = 1234
+
+ override def setUp() {
+ super.setUp()
+ val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+ props.put("advertised.host.name", advertisedHostName)
+ props.put("advertised.port", advertisedPort.toString)
+
+ server = TestUtils.createServer(new KafkaConfig(props))
+ }
+
+ override def tearDown() {
+ server.shutdown()
+ Utils.rm(server.config.logDirs)
+ super.tearDown()
+ }
+
+ def testBrokerAdvertiseToZK {
+ val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
+ assertEquals(advertisedHostName, brokerInfo.get.host)
+ assertEquals(advertisedPort, brokerInfo.get.port)
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2f75e1d..89c207a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -64,4 +64,34 @@ class KafkaConfigTest extends JUnit3Suite {
}
-}
\ No newline at end of file
+ @Test
+ def testAdvertiseDefaults() {
+ val port = 9999
+ val hostName = "fake-host"
+
+ val props = TestUtils.createBrokerConfig(0, port)
+ props.put("host.name", hostName)
+
+ val serverConfig = new KafkaConfig(props)
+
+ assertEquals(serverConfig.advertisedHostName, hostName)
+ assertEquals(serverConfig.advertisedPort, port)
+ }
+
+ @Test
+ def testAdvertiseConfigured() {
+ val port = 9999
+ val advertisedHostName = "routable-host"
+ val advertisedPort = 1234
+
+ val props = TestUtils.createBrokerConfig(0, port)
+ props.put("advertised.host.name", advertisedHostName)
+ props.put("advertised.port", advertisedPort.toString)
+
+ val serverConfig = new KafkaConfig(props)
+
+ assertEquals(serverConfig.advertisedHostName, advertisedHostName)
+ assertEquals(serverConfig.advertisedPort, advertisedPort)
+ }
+
+}
|