kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6656; Config tool should return non-zero status code on failure (#4711)
Date Thu, 15 Mar 2018 17:33:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 372b4c6  KAFKA-6656; Config tool should return non-zero status code on failure (#4711)
372b4c6 is described below

commit 372b4c6a775461c6ca54d4c58078718f5f046216
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Mar 15 10:33:29 2018 -0700

    KAFKA-6656; Config tool should return non-zero status code on failure (#4711)
    
    Prior to this patch, we caught some exceptions when executing the command, which meant
that it would return with status code zero. This patch fixes this and makes the expected exit
behavior explicit. Test cases have been added to verify the change.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 67 +++++++++++-----------
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 59 ++++++++++++++++---
 2 files changed, 86 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ddf6dcd..044be6a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -25,11 +25,11 @@ import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.CommandLineUtils
+import kafka.utils.{CommandLineUtils, Exit}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, ConfigEntry,
DescribeConfigsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions,
AdminClient => JAdminClient, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
@@ -65,35 +65,43 @@ object ConfigCommand extends Config {
     DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
 
   def main(args: Array[String]): Unit = {
+    try {
+      val opts = new ConfigCommandOptions(args)
 
-    val opts = new ConfigCommandOptions(args)
-
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic,
client, user or broker")
-
-    opts.checkArgs()
-
-    val time = Time.SYSTEM
+      if (args.length == 0)
+        CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic,
client, user or broker")
 
-    if (opts.options.has(opts.zkConnectOpt)) {
-      val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled,
30000, 30000,
-        Int.MaxValue, time)
-      val adminZkClient = new AdminZkClient(zkClient)
+      opts.checkArgs()
 
-      try {
-        if (opts.options.has(opts.alterOpt))
-          alterConfig(zkClient, opts, adminZkClient)
-        else if (opts.options.has(opts.describeOpt))
-          describeConfig(zkClient, opts, adminZkClient)
-      } catch {
-        case e: Throwable =>
-          println("Error while executing config command " + e.getMessage)
-          println(Utils.stackTrace(e))
-      } finally {
-        zkClient.close()
+      if (opts.options.has(opts.zkConnectOpt)) {
+        processCommandWithZk(opts.options.valueOf(opts.zkConnectOpt), opts)
+      } else {
+        processBrokerConfig(opts)
       }
-    } else {
-      processBrokerConfig(opts)
+    } catch {
+      case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException)
=>
+        logger.debug(s"Failed config command with args $args", e)
+        System.err.println(e.getMessage)
+        Exit.exit(1)
+
+      case t: Throwable =>
+        System.err.println(s"Error while executing config command with args $args")
+        t.printStackTrace(System.err)
+        Exit.exit(1)
+    }
+  }
+
+  private def processCommandWithZk(zkConnectString: String, opts: ConfigCommandOptions):
Unit = {
+    val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSecurityEnabled, 30000, 30000,
+      Int.MaxValue, Time.SYSTEM)
+    val adminZkClient = new AdminZkClient(zkClient)
+    try {
+      if (opts.options.has(opts.alterOpt))
+        alterConfig(zkClient, opts, adminZkClient)
+      else if (opts.options.has(opts.describeOpt))
+        describeConfig(zkClient, opts, adminZkClient)
+    } finally {
+      zkClient.close()
     }
   }
 
@@ -217,14 +225,9 @@ object ConfigCommand extends Config {
         alterBrokerConfig(adminClient, opts, entityName)
       else if (opts.options.has(opts.describeOpt))
         describeBrokerConfig(adminClient, opts, entityName)
-    } catch {
-      case e: Throwable =>
-        println("Error while executing config command " + e.getMessage)
-        println(Utils.stackTrace(e))
     } finally {
       adminClient.close()
     }
-
   }
 
   private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions,
entityName: String) {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index b3c46fa..a17f060 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.common.InvalidConfigException
 import kafka.server.ConfigEntityName
-import kafka.utils.Logging
+import kafka.utils.{Exit, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.config.ConfigResource
@@ -38,6 +38,49 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
+
+  @Test
+  def shouldExitWithNonZeroStatusOnArgError(): Unit = {
+    assertNonZeroStatusExit(Array("--blah"))
+  }
+
+  @Test
+  def shouldExitWithNonZeroStatusOnZkCommandError(): Unit = {
+    assertNonZeroStatusExit(Array(
+      "--zookeeper", zkConnect,
+      "--entity-name", "1",
+      "--entity-type", "brokers",
+      "--alter",
+      "--add-config", "message.max.size=100000"))
+  }
+
+  @Test
+  def shouldExitWithNonZeroStatusOnBrokerCommandError(): Unit = {
+    assertNonZeroStatusExit(Array(
+      "--bootstrap-server", "invalid host",
+      "--entity-type", "brokers",
+      "--entity-name", "1",
+      "--describe"))
+  }
+
+  private def assertNonZeroStatusExit(args: Array[String]): Unit = {
+    var exitStatus: Option[Int] = None
+    Exit.setExitProcedure { (status, _) =>
+      exitStatus = Some(status)
+      throw new RuntimeException
+    }
+
+    try {
+      ConfigCommand.main(args)
+    } catch {
+      case e: RuntimeException =>
+    } finally {
+      Exit.resetExitProcedure()
+    }
+
+    assertEquals(Some(1), exitStatus)
+  }
+
   @Test
   def shouldParseArgumentsForClientsEntityType() {
     testArgumentParse("clients")
@@ -111,7 +154,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def changeClientIdConfig(clientId: String, configChange: Properties): Unit
= {
         assertEquals("my-client-id", clientId)
         assertEquals("b", configChange.get("a"))
@@ -130,7 +173,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         assertEquals("my-topic", topic)
         assertEquals("b", configChange.get("a"))
@@ -149,7 +192,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"))
 
-    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit
= {
         assertEquals(Seq(1), brokerIds)
         assertEquals("10", configChange.get("leader.replication.throttled.rate"))
@@ -225,7 +268,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
 
-    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit
= {
         assertEquals(Seq(1), brokerIds)
         assertEquals("b", configChange.get("a"))
@@ -297,7 +340,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--delete-config", "a,c"))
 
-    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+    class TestAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
       override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {
         val properties: Properties = new Properties
         properties.put("a", "b")
@@ -332,7 +375,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         "--delete-config", mechanism))
 
     val credentials = mutable.Map[String, Properties]()
-    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations:
Int) extends AdminZkClient(zkClient) {
+    case class CredentialChange(user: String, mechanisms: Set[String], iterations: Int) extends
AdminZkClient(zkClient) {
       override def fetchEntityConfig(entityType: String, entityName: String): Properties
= {
         credentials.getOrElse(entityName, new Properties())
       }
@@ -536,7 +579,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
   }
 
-  case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient)
{
+  class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
     override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
     override def fetchEntityConfig(entityType: String, entityName: String): Properties =
{new Properties}
     override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message