kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1654 Provide a way to override server configuration from command line; reviewed by Neha Narkhede
Date Mon, 24 Nov 2014 00:41:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f0fd70929 -> 409c367ce


KAFKA-1654 Provide a way to override server configuration from command line; reviewed by Neha
Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/409c367c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/409c367c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/409c367c

Branch: refs/heads/trunk
Commit: 409c367ceb5f9eae395cb346dd3fa02b8ee8ee70
Parents: f0fd709
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Nov 23 16:40:58 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Sun Nov 23 16:41:24 2014 -0800

----------------------------------------------------------------------
 bin/kafka-server-start.sh                       |   2 +-
 core/src/main/scala/kafka/Kafka.scala           |  38 ++++--
 .../test/scala/unit/kafka/KafkaConfigTest.scala | 120 +++++++++++++++++++
 3 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/409c367c/bin/kafka-server-start.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-server-start.sh b/bin/kafka-server-start.sh
index 1737d0c..dc01d46 100755
--- a/bin/kafka-server-start.sh
+++ b/bin/kafka-server-start.sh
@@ -16,7 +16,7 @@
 
 if [ $# -lt 1 ];
 then
-	echo "USAGE: $0 [-daemon] server.properties"
+	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
 	exit 1
 fi
 base_dir=$(dirname $0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/409c367c/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 2e94fee..77a49e1 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -17,22 +17,42 @@
 
 package kafka
 
-
+import scala.collection.JavaConversions._
+import joptsimple.OptionParser
 import metrics.KafkaMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
-import utils.{Utils, Logging}
+import kafka.utils.{CommandLineUtils, Utils, Logging}
 
 object Kafka extends Logging {
 
-  def main(args: Array[String]): Unit = {
-    if (args.length != 1) {
-      println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
-      System.exit(1)
+  def getKafkaConfigFromArgs(args: Array[String]): KafkaConfig = {
+    val optionParser = new OptionParser
+    val overrideOpt = optionParser.accepts("override", "Optional property that should override
values set in server.properties file")
+      .withRequiredArg()
+      .ofType(classOf[String])
+
+    if (args.length == 0) {
+      CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties
[--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
+    }
+
+    val props = Utils.loadProps(args(0))
+
+    if(args.length > 1) {
+      val options = optionParser.parse(args.slice(1, args.length): _*)
+
+      if(options.nonOptionArguments().size() > 0) {
+        CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: "
+ options.nonOptionArguments().toArray.mkString(","))
+      }
+
+      props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
     }
-  
+
+    new KafkaConfig(props)
+  }
+
+  def main(args: Array[String]): Unit = {
     try {
-      val props = Utils.loadProps(args(0))
-      val serverConfig = new KafkaConfig(props)
+      val serverConfig = getKafkaConfigFromArgs(args)
       KafkaMetricsReporter.startReporters(serverConfig.props)
       val kafkaServerStartable = new KafkaServerStartable(serverConfig)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/409c367c/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
new file mode 100644
index 0000000..4d36b8b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka
+
+import java.io.{FileOutputStream, File}
+import java.security.Permission
+
+import kafka.Kafka
+import org.junit.{After, Before, Test}
+import junit.framework.Assert._
+
+class KafkaTest {
+
+  val originalSecurityManager: SecurityManager = System.getSecurityManager
+
+  class ExitCalled extends SecurityException {
+  }
+
+  private class NoExitSecurityManager extends SecurityManager {
+    override def checkExit(status: Int): Unit = {
+      throw new ExitCalled
+    }
+
+    override def checkPermission(perm : Permission): Unit = {
+    }
+
+    override def checkPermission(perm : Permission, context: Object): Unit = {
+    }
+  }
+
+  @Before
+  def setSecurityManager() : Unit = {
+    System.setSecurityManager(new NoExitSecurityManager)
+  }
+
+  @After
+  def setOriginalSecurityManager() : Unit = {
+    System.setSecurityManager(originalSecurityManager)
+  }
+
+  @Test
+  def testGetKafkaConfigFromArgs(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+
+    // We should load configuration file without any arguments
+    val config1 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile))
+    assertEquals(1, config1.brokerId)
+
+    // We should be able to override given property on command line
+    val config2 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "broker.id=2"))
+    assertEquals(2, config2.brokerId)
+
+    // We should be also able to set completely new property
+    val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987"))
+    assertEquals(1, config3.brokerId)
+    assertEquals(1987, config3.port)
+
+    // We should be also able to set several properties
+    val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987",
"--override", "broker.id=2"))
+    assertEquals(2, config4.brokerId)
+    assertEquals(1987, config4.port)
+  }
+
+  @Test(expected = classOf[ExitCalled])
+  def testGetKafkaConfigFromArgsWrongSetValue(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "a=b=c"))
+  }
+
+  @Test(expected = classOf[ExitCalled])
+  def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "broker.id=1", "broker.id=2"))
+  }
+
+  @Test(expected = classOf[ExitCalled])
+  def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2"))
+  }
+
+  @Test(expected = classOf[ExitCalled])
+  def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = {
+    val propertiesFile = prepareDefaultConfig()
+    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))
+  }
+
+  def prepareDefaultConfig(): String = {
+    prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
+  }
+
+  def prepareConfig(lines : Array[String]): String = {
+    val file = File.createTempFile("kafkatest", ".properties")
+    file.deleteOnExit()
+
+    val writer = new FileOutputStream(file)
+    lines.foreach { l =>
+      writer.write(l.getBytes)
+      writer.write("\n".getBytes)
+    }
+
+    writer.close
+
+    file.getAbsolutePath
+  }
+}


Mime
View raw message