kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1092; Add server config parameter to separate bind address and ZK hostname; patched by Roger Hoover; reviewed by Jun Rao
Date Thu, 31 Oct 2013 04:06:42 GMT
Updated Branches:
  refs/heads/trunk bf4dbd5ee -> a55ec0620


kafka-1092; Add server config parameter to separate bind address and ZK hostname; patched
by Roger Hoover; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a55ec062
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a55ec062
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a55ec062

Branch: refs/heads/trunk
Commit: a55ec0620f6ce805fafe2e1d4035ec3e0ab4e0d0
Parents: bf4dbd5
Author: Roger Hoover <roger.hoover@gmail.com>
Authored: Wed Oct 30 21:06:23 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 30 21:06:23 2013 -0700

----------------------------------------------------------------------
 config/server.properties                        | 13 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 13 ++++-
 .../scala/kafka/server/KafkaHealthcheck.scala   | 14 +++---
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../unit/kafka/producer/ProducerTest.scala      | 12 ++---
 .../unit/kafka/server/AdvertiseBrokerTest.scala | 52 ++++++++++++++++++++
 .../unit/kafka/server/KafkaConfigTest.scala     | 34 ++++++++++++-
 7 files changed, 118 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 2eccc5e..8efa83f 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -24,11 +24,18 @@ broker.id=0
 # The port the socket server listens on
 port=9092
 
-# Hostname the broker will bind to and advertise to producers and consumers.
-# If not set, the server will bind to all interfaces and advertise the value returned from
-# from java.net.InetAddress.getCanonicalHostName().
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
 #host.name=localhost
 
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
 # The number of threads handling network requests
 num.network.threads=2
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 74442b6..b324344 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -69,8 +69,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val port: Int = props.getInt("port", 6667)
 
   /* hostname of broker. If this is set, it will only bind to this address. If this is not
set,
-   * it will bind to all interfaces, and publish one to ZK */
+   * it will bind to all interfaces */
   val hostName: String = props.getString("host.name", null)
+  
+  /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may
+   * need to be different from the interface to which the broker binds. If this is not set,
+   * it will use the value for "host.name" if configured. Otherwise
+   * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */
+  val advertisedHostName: String = props.getString("advertised.host.name", hostName)
+    
+  /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may
+   * need to be different from the port to which the broker binds. If this is not set,
+   * it will publish the same port that the broker binds to. */
+  val advertisedPort: Int = props.getInt("advertised.port", port)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 84ea17a..9dca55c 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -27,14 +27,14 @@ import java.net.InetAddress
 /**
  * This class registers the broker in zookeeper to allow 
  * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
- *   /brokers/[0...N] --> host:port
+ *   /brokers/[0...N] --> advertisedHost:advertisedPort
  *   
  * Right now our definition of health is fairly naive. If we register in zk we are healthy,
otherwise
  * we are dead.
  */
 class KafkaHealthcheck(private val brokerId: Int, 
-                       private val host: String, 
-                       private val port: Int,
+                       private val advertisedHost: String, 
+                       private val advertisedPort: Int,
                        private val zkSessionTimeoutMs: Int,
                        private val zkClient: ZkClient) extends Logging {
 
@@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int,
    * Register this broker as "alive" in zookeeper
    */
   def register() {
-    val hostName = 
-      if(host == null || host.trim.isEmpty) 
+    val advertisedHostName = 
+      if(advertisedHost == null || advertisedHost.trim.isEmpty) 
         InetAddress.getLocalHost.getCanonicalHostName 
       else
-        host
+        advertisedHost
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs,
jmxPort)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5e35a89..5e34f95 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
     topicConfigManager.startup()
     
     /* tell everyone we are alive */
-    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port,
config.zkSessionTimeoutMs, zkClient)
+    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort,
config.zkSessionTimeoutMs, zkClient)
     kafkaHealthcheck.startup()
 
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 2fb059b..4b2e4ad 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -49,15 +49,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
   private var servers = List.empty[KafkaServer]
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  private val config1 = new KafkaConfig(props1) {
-    override val hostName = "localhost"
-    override val numPartitions = 4
-  }
+  props1.put("num.partitions", "4")
+  private val config1 = new KafkaConfig(props1)
   private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  private val config2 = new KafkaConfig(props2) {
-    override val hostName = "localhost"
-    override val numPartitions = 4
-  }
+  props2.put("num.partitions", "4")
+  private val config2 = new KafkaConfig(props2)
 
   override def setUp() {
     super.setUp()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
new file mode 100644
index 0000000..f0c4a56
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+
+class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
+  var server : KafkaServer = null
+  val brokerId = 0
+  val advertisedHostName = "routable-host"
+  val advertisedPort = 1234
+
+  override def setUp() {
+    super.setUp()
+    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
+    props.put("advertised.host.name", advertisedHostName)
+    props.put("advertised.port", advertisedPort.toString)
+    
+    server = TestUtils.createServer(new KafkaConfig(props))
+  }
+
+  override def tearDown() {
+    server.shutdown()
+    Utils.rm(server.config.logDirs)
+    super.tearDown()
+  }
+  
+  def testBrokerAdvertiseToZK {
+    val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
+    assertEquals(advertisedHostName, brokerInfo.get.host)
+    assertEquals(advertisedPort, brokerInfo.get.port)
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2f75e1d..89c207a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -64,4 +64,34 @@ class KafkaConfigTest extends JUnit3Suite {
 
   }
 
-}
\ No newline at end of file
+  @Test
+  def testAdvertiseDefaults() {
+    val port = 9999
+    val hostName = "fake-host"
+    
+    val props = TestUtils.createBrokerConfig(0, port)
+    props.put("host.name", hostName)
+    
+    val serverConfig = new KafkaConfig(props)
+    
+    assertEquals(serverConfig.advertisedHostName, hostName)
+    assertEquals(serverConfig.advertisedPort, port)
+  }
+
+  @Test
+  def testAdvertiseConfigured() {
+    val port = 9999
+    val advertisedHostName = "routable-host"
+    val advertisedPort = 1234
+    
+    val props = TestUtils.createBrokerConfig(0, port)
+    props.put("advertised.host.name", advertisedHostName)
+    props.put("advertised.port", advertisedPort.toString)
+    
+    val serverConfig = new KafkaConfig(props)
+    
+    assertEquals(serverConfig.advertisedHostName, advertisedHostName)
+    assertEquals(serverConfig.advertisedPort, advertisedPort)
+  }
+  
+}


Mime
View raw message