kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-6805: Enable broker configs to be stored in ZK before broker start (#4898)
Date Mon, 18 Jun 2018 17:32:11 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 8ad019d  KAFKA-6805: Enable broker configs to be stored in ZK before broker start
(#4898)
8ad019d is described below

commit 8ad019de1d4f8d918a5f89b674a8779ab1cbf390
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jun 18 18:28:08 2018 +0100

    KAFKA-6805: Enable broker configs to be stored in ZK before broker start (#4898)
    
    Support configuration of dynamic broker configs in ZooKeeper before starting brokers using
ConfigCommand. This will allow password configs to be encrypted and stored in ZooKeeper, without
requiring clear passwords in server.properties to bootstrap the broker first.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  90 +++++++++++++--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  72 ++++++------
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  22 ++--
 .../server/DynamicBrokerReconfigurationTest.scala  |  49 +++++---
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 125 ++++++++++++++++++---
 docs/configuration.html                            |  17 +++
 docs/upgrade.html                                  |   2 +
 7 files changed, 295 insertions(+), 82 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index c1b384f..6ac0a01 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,13 +24,14 @@ import joptsimple._
 import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.{CommandLineUtils, Exit}
+import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig,
KafkaConfig}
+import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions,
AdminClient => JAdminClient, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter,
ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -56,11 +57,14 @@ import scala.collection.JavaConverters._
 object ConfigCommand extends Config {
 
   val DefaultScramIterations = 4096
-  // Dynamic broker configs can only be updated using the new AdminClient since they may
require
-  // password encryption currently implemented only in the broker. For consistency with older
versions,
-  // quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will
be migrated
-  // fully to the new AdminClient later (KIP-248).
-  val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
+  // Dynamic broker configs can only be updated using the new AdminClient once brokers have
started
+  // so that configs may be fully validated. Prior to starting brokers, updates may be performed
using
+  // ZooKeeper for bootstrapping. This allows all password configs to be stored encrypted
in ZK,
+  // avoiding clear passwords in server.properties. For consistency with older versions,
quota-related
+  // broker configs can still be updated using ZooKeeper at any time. ConfigCommand will
be migrated
+  // to the new AdminClient later for these configs (KIP-248).
+  val BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning = Set(
+    DynamicConfig.Broker.LeaderReplicationThrottledRateProp,
     DynamicConfig.Broker.FollowerReplicationThrottledRateProp,
     DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
 
@@ -114,9 +118,25 @@ object ConfigCommand extends Config {
 
     if (entityType == ConfigType.User)
       preProcessScramCredentials(configsToBeAdded)
-    if (entityType == ConfigType.Broker) {
-      require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains),
-        s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded")
+    else if (entityType == ConfigType.Broker) {
+      // Replication quota configs may be updated using ZK at any time. Other dynamic broker
configs
+      // may be updated using ZooKeeper only if the corresponding broker is not running.
Dynamic broker
+      // configs at cluster-default level may be configured using ZK only if there are no
brokers running.
+      val dynamicBrokerConfigs = configsToBeAdded.asScala.keySet.filterNot(BrokerConfigsUpdatableUsingZooKeeperWhileBrokerRunning.contains)
+      if (dynamicBrokerConfigs.nonEmpty) {
+        val perBrokerConfig = entityName != ConfigEntityName.Default
+        val errorMessage = s"--bootstrap-server option must be specified to update broker
configs $dynamicBrokerConfigs."
+        val info = "Broker configuraton updates using ZooKeeper are supported for bootstrapping
before brokers" +
+          " are started to enable encrypted password configs to be stored in ZooKeeper."
+        if (perBrokerConfig) {
+          adminZkClient.parseBroker(entityName).foreach { brokerId =>
+            require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage when broker $entityName
is running. $info")
+          }
+        } else {
+          require(zkClient.getAllBrokersInCluster.isEmpty, s"$errorMessage for default cluster
if any broker is running. $info")
+        }
+        preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
+      }
     }
 
     // compile the final set of configs
@@ -156,6 +176,49 @@ object ConfigCommand extends Config {
     }
   }
 
+  private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder
= {
+    encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
+    val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
+      throw new IllegalArgumentException("Password encoder secret not specified"))
+    new PasswordEncoder(new Password(encoderSecret),
+      None,
+      encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
+      encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
+      encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderIterations))
+  }
+
+  /**
+   * Pre-process broker configs provided to convert them to persistent format.
+   * Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
+   * The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
+   */
+  private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean)
{
+    val passwordEncoderConfigs = new Properties
+    passwordEncoderConfigs ++= configsToBeAdded.asScala.filterKeys(_.startsWith("password.encoder."))
+    if (!passwordEncoderConfigs.isEmpty) {
+      info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting"
+
+        " passwords, but will not be stored in ZooKeeper.")
+      passwordEncoderConfigs.asScala.keySet.foreach(configsToBeAdded.remove)
+    }
+
+    DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
+    val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
+    if (passwordConfigs.nonEmpty) {
+      require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp),
+        s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs."
+
+          " Other password encoder configs like cipher algorithm and iterations may also
be specified" +
+          " to override the default encoding parameters. Password encoder configs will not
be persisted" +
+          " in ZooKeeper."
+      )
+
+      val passwordEncoder = createPasswordEncoder(passwordEncoderConfigs.asScala)
+      passwordConfigs.foreach { configName =>
+        val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName)))
+        configsToBeAdded.setProperty(configName, encodedValue)
+      }
+    }
+  }
+
   private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient:
AdminZkClient) {
     val configEntity = parseEntity(opts)
     val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined
&& !configEntity.child.isDefined
@@ -358,7 +421,12 @@ object ConfigCommand extends Config {
       parseQuotaEntity(opts)
     else {
       // Exactly one entity type and at-most one entity name expected for other entities
-      val name = if (opts.options.has(opts.entityName)) Some(opts.options.valueOf(opts.entityName))
else None
+      val name = if (opts.options.has(opts.entityName))
+        Some(opts.options.valueOf(opts.entityName))
+      else if (entityTypes.head == ConfigType.Broker && opts.options.has(opts.entityDefault))
+        Some(ConfigEntityName.Default)
+      else
+        None
       ConfigEntity(Entity(entityTypes.head, name), None)
     }
   }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 4225cdb..72772fa 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -115,6 +115,43 @@ object DynamicBrokerConfig {
     }
   }
 
+  def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit =  {
+    def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
+      if (invalidPropNames.nonEmpty)
+        throw new ConfigException(s"$errorMessage: $invalidPropNames")
+    }
+    checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
+    checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
+      "These security configs can be dynamically updated only per-listener using the listener
prefix")
+    validateConfigTypes(props)
+    if (!perBrokerConfig) {
+      checkInvalidProps(perBrokerConfigs(props),
+        "Cannot update these configs at default cluster level, broker id must be specified")
+    }
+  }
+
+  private def perBrokerConfigs(props: Properties): Set[String] = {
+    val configNames = props.asScala.keySet
+    configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
+  }
+
+  private def nonDynamicConfigs(props: Properties): Set[String] = {
+    props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
+  }
+
+  private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
+    DynamicSecurityConfigs.filter(props.containsKey)
+  }
+
+  private def validateConfigTypes(props: Properties): Unit = {
+    val baseProps = new Properties
+    props.asScala.foreach {
+      case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
+      case (k, v) => baseProps.put(k, v)
+    }
+    DynamicConfig.Broker.validate(baseProps)
+  }
+
   private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
     KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config
=>
       configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
@@ -298,57 +335,26 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
             decoded.foreach { value => props.put(configName, passwordEncoder.encode(new
Password(value))) }
           }
         }
-        adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props)
+        adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
       }
     }
     props
   }
 
   private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock)
{
-    def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
-      if (invalidPropNames.nonEmpty)
-        throw new ConfigException(s"$errorMessage: $invalidPropNames")
-    }
-    checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
-    checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
-      "These security configs can be dynamically updated only per-listener using the listener
prefix")
-    validateConfigTypes(props)
+    validateConfigs(props, perBrokerConfig)
     val newProps = mutable.Map[String, String]()
     newProps ++= staticBrokerConfigs
     if (perBrokerConfig) {
       overrideProps(newProps, dynamicDefaultConfigs)
       overrideProps(newProps, props.asScala)
     } else {
-      checkInvalidProps(perBrokerConfigs(props),
-        "Cannot update these configs at default cluster level, broker id must be specified")
       overrideProps(newProps, props.asScala)
       overrideProps(newProps, dynamicBrokerConfigs)
     }
     processReconfiguration(newProps, validateOnly = true)
   }
 
-  private def perBrokerConfigs(props: Properties): Set[String] = {
-    val configNames = props.asScala.keySet
-    configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
-  }
-
-  private def nonDynamicConfigs(props: Properties): Set[String] = {
-    props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
-  }
-
-  private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
-    DynamicSecurityConfigs.filter(props.containsKey)
-  }
-
-  private def validateConfigTypes(props: Properties): Unit = {
-    val baseProps = new Properties
-    props.asScala.foreach {
-      case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
-      case (k, v) => baseProps.put(k, v)
-    }
-    DynamicConfig.Broker.validate(baseProps)
-  }
-
   private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
     try {
       validateConfigTypes(props)
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 2f8da36..8a6b3ee 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -265,6 +265,18 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     }
   }
 
+  def parseBroker(broker: String): Option[Int] = {
+    broker match {
+      case ConfigEntityName.Default => None
+      case _ =>
+        try Some(broker.toInt)
+        catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's
Entity Name must be a single integer value")
+        }
+    }
+  }
+
   /**
    * Change the configs for a given entityType and entityName
    * @param entityType
@@ -273,19 +285,11 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
    */
   def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit =
{
 
-    def parseBroker(broker: String): Int = {
-      try broker.toInt
-      catch {
-        case _: NumberFormatException =>
-          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's
Entity Name must be a single integer value")
-      }
-    }
-
     entityType match {
       case ConfigType.Topic => changeTopicConfig(entityName, configs)
       case ConfigType.Client => changeClientIdConfig(entityName, configs)
       case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
-      case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
+      case ConfigType.Broker => changeBrokerConfig(parseBroker(entityName), configs)
       case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType.
Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
     }
   }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 69ca317..52ad2b9 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -126,6 +126,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
 
       val kafkaConfig = KafkaConfig.fromProps(props)
+      configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
 
       servers += TestUtils.createServer(kafkaConfig)
     }
@@ -778,21 +779,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
       val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
       val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
       val config = server.config
-      val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password
encoder secret not configured"))
       val oldSecret = "old-dynamic-config-secret"
       config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp,
oldSecret)
       val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig)
       assertTrue("Password configs not found", passwordConfigs.nonEmpty)
-      val passwordDecoder = new PasswordEncoder(secret,
-        config.passwordEncoderKeyFactoryAlgorithm,
-        config.passwordEncoderCipherAlgorithm,
-        config.passwordEncoderKeyLength,
-        config.passwordEncoderIterations)
-      val passwordEncoder = new PasswordEncoder(new Password(oldSecret),
-        config.passwordEncoderKeyFactoryAlgorithm,
-        config.passwordEncoderCipherAlgorithm,
-        config.passwordEncoderKeyLength,
-        config.passwordEncoderIterations)
+      val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret)
+      val passwordEncoder = createPasswordEncoder(config, Some(new Password(oldSecret)))
       passwordConfigs.foreach { case (name, value) =>
         val decoded = passwordDecoder.decode(value).value
         propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded)))
@@ -1161,12 +1153,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
 
   private def listenerPrefix(name: String): String = new ListenerName(name).configPrefix
 
-  private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int],
sslProperties: Properties): Unit = {
+  private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, sslProperties:
Properties): Unit = {
+    val externalListenerPrefix = listenerPrefix(SecureExternal)
     val sslStoreProps = new Properties
-    sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
-    val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(sslStoreProps, perBrokerConfig
= true)
+    sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix)
+    sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
     zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    adminZkClient.changeBrokerConfig(brokers, persistentProps)
+
+    val args = Array("--zookeeper", kafkaConfig.zkConnect,
+      "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","),
+      "--entity-type", "brokers",
+      "--entity-name", kafkaConfig.brokerId.toString)
+    ConfigCommand.main(args)
+
+    val passwordEncoder = createPasswordEncoder(kafkaConfig, kafkaConfig.passwordEncoderSecret)
+    val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)
+    assertEquals(4, brokerProps.size)
+    assertEquals(sslProperties.get(SSL_KEYSTORE_TYPE_CONFIG),
+      brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_TYPE_CONFIG"))
+    assertEquals(sslProperties.get(SSL_KEYSTORE_LOCATION_CONFIG),
+      brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_LOCATION_CONFIG"))
+    assertEquals(sslProperties.get(SSL_KEYSTORE_PASSWORD_CONFIG),
+      passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEYSTORE_PASSWORD_CONFIG")))
+    assertEquals(sslProperties.get(SSL_KEY_PASSWORD_CONFIG),
+      passwordEncoder.decode(brokerProps.getProperty(s"$externalListenerPrefix$SSL_KEY_PASSWORD_CONFIG")))
+  }
+
+  private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder
= {
+    val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder
secret not configured"))
+    new PasswordEncoder(encoderSecret,
+      config.passwordEncoderKeyFactoryAlgorithm,
+      config.passwordEncoderCipherAlgorithm,
+      config.passwordEncoderKeyLength,
+      config.passwordEncoderIterations)
   }
 
   private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000):
Unit = {
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a24800f..2644dcc 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -20,21 +20,25 @@ import java.util
 import java.util.Properties
 
 import kafka.admin.ConfigCommand.ConfigCommandOptions
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
 import kafka.common.InvalidConfigException
-import kafka.server.ConfigEntityName
+import kafka.server.{ConfigEntityName, KafkaConfig}
 import kafka.utils.{Exit, Logging}
-import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 import scala.collection.JavaConverters._
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@@ -51,7 +55,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-name", "1",
       "--entity-type", "brokers",
       "--alter",
-      "--add-config", "message.max.size=100000"))
+      "--add-config", "security.inter.broker.protocol=PLAINTEXT"))
   }
 
   @Test
@@ -306,14 +310,99 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
-  @Test (expected = classOf[IllegalArgumentException])
-  def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = {
-    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "1",
-      "--entity-type", "brokers",
-      "--alter",
-      "--add-config", "message.max.size=100000"))
-    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
+  @Test
+  def testDynamicBrokerConfigUpdateUsingZooKeeper(): Unit = {
+    val brokerId = "1"
+    val adminZkClient = new AdminZkClient(zkClient)
+    val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter")
+
+    def entityOpt(brokerId: Option[String]): Array[String] = {
+      brokerId.map(id => Array("--entity-name", id)).getOrElse(Array("--entity-default"))
+    }
+
+    def alterConfig(configs: Map[String, String], brokerId: Option[String],
+                    encoderConfigs: Map[String, String] = Map.empty): Unit = {
+      val configStr = (configs ++ encoderConfigs).map { case (k, v) => s"$k=$v" }.mkString(",")
+      val addOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++ Array("--add-config",
configStr))
+      ConfigCommand.alterConfig(zkClient, addOpts, adminZkClient)
+    }
+
+    def verifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = {
+      val entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.getOrElse(ConfigEntityName.Default))
+      assertEquals(configs, entityConfigs.asScala)
+    }
+
+    def alterAndVerifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit
= {
+      alterConfig(configs, brokerId)
+      verifyConfig(configs, brokerId)
+    }
+
+    def deleteAndVerifyConfig(configNames: Set[String], brokerId: Option[String]): Unit =
{
+      val deleteOpts = new ConfigCommandOptions(alterOpts ++ entityOpt(brokerId) ++
+        Array("--delete-config", configNames.mkString(",")))
+      ConfigCommand.alterConfig(zkClient, deleteOpts, adminZkClient)
+      verifyConfig(Map.empty, brokerId)
+    }
+
+    // Add config
+    alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
+    alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
+
+    // Change config
+    alterAndVerifyConfig(Map("message.max.size" -> "130000"), Some(brokerId))
+    alterAndVerifyConfig(Map("message.max.size" -> "140000"), None)
+
+    // Delete config
+    deleteAndVerifyConfig(Set("message.max.size"), Some(brokerId))
+    deleteAndVerifyConfig(Set("message.max.size"), None)
+
+    // Listener configs: should work only with listener name
+    alterAndVerifyConfig(Map("listener.name.external.ssl.keystore.location" -> "/tmp/test.jks"),
Some(brokerId))
+    intercept[ConfigException](alterConfig(Map("ssl.keystore.location" -> "/tmp/test.jks"),
Some(brokerId)))
+
+    // Per-broker config configured at default cluster-level should fail
+    intercept[ConfigException](alterConfig(Map("listener.name.external.ssl.keystore.location"
-> "/tmp/test.jks"), None))
+    deleteAndVerifyConfig(Set("listener.name.external.ssl.keystore.location"), Some(brokerId))
+
+    // Password config update without encoder secret should fail
+    intercept[IllegalArgumentException](alterConfig(Map("listener.name.external.ssl.keystore.password"
-> "secret"), Some(brokerId)))
+
+    // Password config update with encoder secret should succeed and encoded password must
be stored in ZK
+    val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads"
-> "2")
+    val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret")
+    alterConfig(configs, Some(brokerId), encoderConfigs)
+    val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
+    assertFalse("Encoder secret stored in ZooKeeper", brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp))
+    assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded
+    val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
+    val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
+    assertEquals("secret", passwordEncoder.decode(encodedPassword).value)
+    assertEquals(configs.size, brokerConfigs.size)
+
+    // Password config update with overrides for encoder parameters
+    val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2")
+    val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret",
+      KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
+      KafkaConfig.PasswordEncoderIterationsProp -> "1024",
+      KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1",
+      KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
+    alterConfig(configs2, Some(brokerId), encoderConfigs2)
+    val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
+    val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")
+    assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs).decode(encodedPassword2).value)
+    assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2).decode(encodedPassword2).value)
+
+
+    // Password config update at default cluster-level should fail
+    intercept[ConfigException](alterConfig(configs, None, encoderConfigs))
+
+    // Dynamic config updates using ZK should fail if broker is running.
+    registerBrokerInZk(brokerId.toInt)
+    intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "210000"),
Some(brokerId)))
+    intercept[IllegalArgumentException](alterConfig(Map("message.max.size" -> "220000"),
None))
+
+    // Dynamic config updates using ZK should for a different broker that is not running
should succeed
+    alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -322,7 +411,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-name", "1",
       "--entity-type", "brokers",
       "--alter",
-      "--add-config", "a="))
+      "--add-config", "a=="))
     ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
@@ -593,6 +682,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
   }
 
+  private def registerBrokerInZk(id: Int): Unit = {
+    zkClient.createTopLevelPaths()
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol)
+    val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), ApiVersion.latestVersion,
jmxPort = 9192)
+    zkClient.registerBrokerInZk(brokerInfo)
+  }
+
   class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
     override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
     override def fetchEntityConfig(entityType: String, entityName: String): Properties =
{new Properties}
diff --git a/docs/configuration.html b/docs/configuration.html
index 8c86534..90c990b 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -90,6 +90,23 @@
   using <code>kafka-configs.sh</code> even if the password config is not being
altered. This constraint will be removed in
   a future release.</p>
 
+  <h5>Updating Password Configs in ZooKeeper Before Starting Brokers</h5>
+
+  From Kafka 2.0.0 onwards, <code>kafka-configs.sh</code> enables dynamic broker
configs to be updated using ZooKeeper before
+  starting brokers for bootstrapping. This enables all password configs to be stored in encrypted
form, avoiding the need for
+  clear passwords in <code>server.properties</code>. The broker config <code>password.encoder.secret</code>
must also be specified
+  if any password configs are included in the alter command. Additional encryption parameters
may also be specified. Password
+  encoder configs will not be persisted in ZooKeeper. For example, to store SSL key password
for listener <code>INTERNAL</code>
+  on broker 0:
+
+  <pre class="brush: bash;">
+  &gt; bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name
0 --alter --add-config
+    'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192'
+  </pre>
+
+  The configuration <code>listener.name.internal.ssl.key.password</code> will
be persisted in ZooKeeper in encrypted
+  form using the provided encoder configs. The encoder secret and iterations are not persisted
in ZooKeeper.
+
   <h5>Updating SSL Keystore of an Existing Listener</h5>
   Brokers may be configured with SSL keystores with short validity periods to reduce the
risk of compromised certificates.
   Keystores may be updated dynamically without restarting the broker. The config name must
be prefixed with the listener prefix
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 89c90d1..1349836 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -127,6 +127,8 @@
     <p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code>
and <code>log.message.downconversion.enable</code> respectively
        to control whether down-conversion is enabled. When disabled, broker does not perform
any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code>
        error to the client.</p></li>
+    <li>Dynamic broker configuration options can be stored in ZooKeeper using kafka-configs.sh
before brokers are started.
+        This option can be used to avoid storing clear passwords in server.properties as
all password configs may be stored encrypted in ZooKeeper.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol
Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message