kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7800; Dynamic log levels admin API (KIP-412)
Date Fri, 02 Aug 2019 18:52:00 GMT
This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a99e011  KAFKA-7800; Dynamic log levels admin API (KIP-412)
a99e011 is described below

commit a99e0111114d1cb8c762494ac195cf84e6425bb3
Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
AuthorDate: Fri Aug 2 11:51:35 2019 -0700

    KAFKA-7800; Dynamic log levels admin API (KIP-412)
    
    <!--
    Is there any breaking changes?  If so this is a major release, make sure '#major' is in at least one
    commit message to get CI to bump the major.  This will prevent automatic down stream dependency
    bumping / consuming.  For more information about semantic versioning see: https://semver.org/
    
    Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block.
    -->
    What
    ----
    <!--
    Briefly describe **what** you have changed and **why**.
    Optionally include implementation strategy.
    -->
    
    References
    ----------
    [**KIP-412**](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels)
    [**KAFKA-7800**](https://issues.apache.org/jira/browse/KAFKA-7800)
    [**Discussion Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201901.mbox/%3CCANZZNGyeVw8q%3Dx9uOQS-18wL3FEmnOwpBnpJ9x3iMLdXY3gEug%40mail.gmail.com%3E)
    [**Vote Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201902.mbox/%3CCANZZNGzpTJg5YX1Gpe5S%3DHSr%3DXGvmxvYLTdA3jWq_qwH-UvorQ%40mail.gmail.com%3E)
    
    <!--
    Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations...
    For code bumps: link to PR, tag or GitHub `/compare/master...master`
    -->
    
    Test&Review
    ------------
    Test cases covered:
    * DescribeConfigs
    * Alter the log level with and without validateOnly, validate the results with DescribeConfigs
    
    Open questions / Follow ups
    --------------------------
    If you're a reviewer, I'd appreciate your thoughts on these questions I have open:
    1. Should we add synchronization to the Log4jController methods? - Seems like we don't get much value from it
    2. Should we instantiate a new Log4jController instead of it having static methods? - All operations are stateless, so I thought static methods would do well
    3. A logger which does not have a set value returns "null" (as seen in the unit tests). Should we just return the Root logger's level?
    
    Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
    
    Reviewers: Gwen Shapira
    
    Closes #6903 from stanislavkozlovski/KAFKA-7800-dynamic-log-levels-admin-ap
---
 .../apache/kafka/clients/admin/ConfigEntry.java    |   1 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  17 +-
 .../apache/kafka/common/config/ConfigResource.java |   2 +-
 .../apache/kafka/common/config/LogLevelConfig.java |  71 +++++++
 .../common/requests/DescribeConfigsResponse.java   |   3 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 138 +++++++++----
 .../src/main/scala/kafka/server/AdminManager.scala |  69 ++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   8 +-
 .../main/scala/kafka/utils/Log4jController.scala   |  92 ++++++---
 .../kafka/api/AdminClientIntegrationTest.scala     | 229 ++++++++++++++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  34 ++-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 164 +++++++++++++--
 12 files changed, 719 insertions(+), 109 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 7775b6a..42cc627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -189,6 +189,7 @@ public class ConfigEntry {
      */
     public enum ConfigSource {
         DYNAMIC_TOPIC_CONFIG,           // dynamic topic config that is configured for a specific topic
+        DYNAMIC_BROKER_LOGGER_CONFIG,   // dynamic broker logger config that is configured for a specific broker
         DYNAMIC_BROKER_CONFIG,          // dynamic broker config that is configured for a specific broker
         DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is configured as default for all brokers in the cluster
         STATIC_BROKER_CONFIG,           // static broker config provided as broker properties at start up (e.g. server.properties file)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 227a03b..8092eec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1762,7 +1762,7 @@ public class KafkaAdminClient extends AdminClient {
         final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+            if (dependsOnSpecificNode(resource)) {
                 brokerFutures.put(resource, new KafkaFutureImpl<>());
                 brokerResources.add(resource);
             } else {
@@ -1887,6 +1887,9 @@ public class KafkaAdminClient extends AdminClient {
             case STATIC_BROKER_CONFIG:
                 configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
                 break;
+            case DYNAMIC_BROKER_LOGGER_CONFIG:
+                configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
+                break;
             case DEFAULT_CONFIG:
                 configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG;
                 break;
@@ -1906,7 +1909,7 @@ public class KafkaAdminClient extends AdminClient {
         final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
 
         for (ConfigResource resource : configs.keySet()) {
-            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+            if (dependsOnSpecificNode(resource)) {
                 NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
                 allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
             } else
@@ -1971,7 +1974,7 @@ public class KafkaAdminClient extends AdminClient {
         final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
 
         for (ConfigResource resource : configs.keySet()) {
-            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+            if (dependsOnSpecificNode(resource)) {
                 NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
                 allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
             } else
@@ -3070,4 +3073,12 @@ public class KafkaAdminClient extends AdminClient {
 
         return new ElectLeadersResult(electionFuture);
     }
+
+    /**
+     * Returns a boolean indicating whether the resource needs to go to a specific node
+     */
+    private boolean dependsOnSpecificNode(ConfigResource resource) {
+        return (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault())
+                || resource.type() == ConfigResource.Type.BROKER_LOGGER;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index 5343a6b..8870238 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -33,7 +33,7 @@ public final class ConfigResource {
      * Type of resource.
      */
     public enum Type {
-        BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
+        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
 
         private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
             Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
diff --git a/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java
new file mode 100644
index 0000000..fe7e2eb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class holds definitions for log level configurations related to Kafka's application logging. See KIP-412 for additional information
+ */
+public class LogLevelConfig {
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * The <code>FATAL</code> level designates a very severe error
+     * that will lead the Kafka broker to abort.
+     */
+    public static final String FATAL_LOG_LEVEL = "FATAL";
+
+    /**
+     * The <code>ERROR</code> level designates error events that
+     * might still allow the broker to continue running.
+     */
+    public static final String ERROR_LOG_LEVEL = "ERROR";
+
+    /**
+     * The <code>WARN</code> level designates potentially harmful situations.
+     */
+    public static final String WARN_LOG_LEVEL = "WARN";
+
+    /**
+     * The <code>INFO</code> level designates informational messages
+     * that highlight normal Kafka events at a coarse-grained level
+     */
+    public static final String INFO_LOG_LEVEL = "INFO";
+
+    /**
+     * The <code>DEBUG</code> level designates fine-grained
+     * informational events that are most useful to debug Kafka
+     */
+    public static final String DEBUG_LOG_LEVEL = "DEBUG";
+
+    /**
+     * The <code>TRACE</code> level designates finer-grained
+     * informational events than the <code>DEBUG</code> level.
+     */
+    public static final String TRACE_LOG_LEVEL = "TRACE";
+
+    public static final Set<String> VALID_LOG_LEVELS = new HashSet<>(Arrays.asList(
+            FATAL_LOG_LEVEL, ERROR_LOG_LEVEL, WARN_LOG_LEVEL,
+            INFO_LOG_LEVEL, DEBUG_LOG_LEVEL, TRACE_LOG_LEVEL
+    ));
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 51c35d5..6d424f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -179,7 +179,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
         DYNAMIC_BROKER_CONFIG((byte) 2),
         DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
         STATIC_BROKER_CONFIG((byte) 4),
-        DEFAULT_CONFIG((byte) 5);
+        DEFAULT_CONFIG((byte) 5),
+        DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6);
 
         final byte id;
         private static final ConfigSource[] VALUES = values();
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 7edc4a4..781cc1a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,8 +28,8 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.security.JaasUtils
@@ -41,7 +41,6 @@ import scala.collection._
 
 
 /**
- * This script can be used to change configs for topics/clients/brokers dynamically
  * This script can be used to change configs for topics/clients/users/brokers dynamically
  * An entity described or altered by the command may be one of:
  * <ul>
@@ -50,12 +49,15 @@ import scala.collection._
  *     <li> user: --entity-type users --entity-name <user-principal>
  *     <li> <user, client>: --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id>
  *     <li> broker: --entity-type brokers --entity-name <broker>
+ *     <li> broker-logger: --entity-type broker-loggers --entity-name <broker>
  * </ul>
  * --entity-default may be used instead of --entity-name when describing or altering default configuration for users and clients.
  *
  */
 object ConfigCommand extends Config {
 
+  val BrokerLoggerConfigType = "broker-loggers"
+  val BrokerSupportedConfigTypes = Seq(ConfigType.Broker, BrokerLoggerConfigType)
   val DefaultScramIterations = 4096
   // Dynamic broker configs can only be updated using the new AdminClient once brokers have started
   // so that configs may be fully validated. Prior to starting brokers, updates may be performed using
@@ -274,49 +276,61 @@ object ConfigCommand extends Config {
     val adminClient = JAdminClient.create(props)
     val entityName = if (opts.options.has(opts.entityName))
       opts.options.valueOf(opts.entityName)
-    else if (opts.options.has(opts.entityDefault))
+    else // default entity
       ""
-    else
-      throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
 
     val entityTypes = opts.options.valuesOf(opts.entityType).asScala
     if (entityTypes.size != 1)
-      throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server")
-    if (entityTypes.head != ConfigType.Broker)
-      throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes")
+      throw new IllegalArgumentException(s"Exactly one --entity-type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server")
 
     try {
       if (opts.options.has(opts.alterOpt))
-        alterBrokerConfig(adminClient, opts, entityName)
+        alterBrokerConfig(adminClient, opts, entityTypes.head, entityName)
       else if (opts.options.has(opts.describeOpt))
-        describeBrokerConfig(adminClient, opts, entityName)
+        describeBrokerConfig(adminClient, opts, entityTypes.head, entityName)
     } finally {
       adminClient.close()
     }
   }
 
-  private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
+  private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
+                                       entityType: String, entityName: String) {
     val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
-    // compile the final set of configs
-    val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
-    val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
+    if (entityType == ConfigType.Broker) {
+      val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+      val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
         .map { entry => (entry.name, entry) }.toMap
 
-    // fail the command if any of the configs to be deleted does not exist
-    val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-    if (invalidConfigs.nonEmpty)
-      throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
-
-    val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-    val sensitiveEntries = newEntries.filter(_._2.value == null)
-    if (sensitiveEntries.nonEmpty)
-      throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
-    val newConfig = new JConfig(newEntries.asJava.values)
-
-    val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-    adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+      // fail the command if any of the configs to be deleted does not exist
+      val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+      if (invalidConfigs.nonEmpty)
+        throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+      val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+      val sensitiveEntries = newEntries.filter(_._2.value == null)
+      if (sensitiveEntries.nonEmpty)
+        throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
+      val newConfig = new JConfig(newEntries.asJava.values)
+
+      val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+      adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+    } else if (entityType == BrokerLoggerConfigType) {
+      val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+      val validLoggers = brokerLoggerConfigs(adminClient, entityName).map(_.name)
+      // fail the command if any of the configured broker loggers do not exist
+      val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
+      if (invalidBrokerLoggers.nonEmpty)
+        throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
+
+      val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+      val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
+        ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+      ).asJavaCollection
+
+      adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+    }
 
     if (entityName.nonEmpty)
       println(s"Completed updating config for broker: $entityName.")
@@ -324,8 +338,13 @@ object ConfigCommand extends Config {
       println(s"Completed updating default config for brokers in the cluster,")
   }
 
-  private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
-    val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
+  private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
+                                   entityType: String, entityName: String) {
+    val configs = if (entityType == ConfigType.Broker)
+      brokerConfig(adminClient, entityName, includeSynonyms = true)
+    else // broker logger
+      brokerLoggerConfigs(adminClient, entityName)
+
     if (entityName.nonEmpty)
       println(s"Configs for broker $entityName are:")
     else
@@ -349,6 +368,15 @@ object ConfigCommand extends Config {
       .toSeq
   }
 
+  /**
+    * Returns all the valid broker logger configurations
+    */
+  private def brokerLoggerConfigs(adminClient: Admin, entityName: String): Seq[ConfigEntry] = {
+    val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+    val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
+    configs.get(configResource).entries.asScala.toSeq
+  }
+
   case class Entity(entityType: String, sanitizedName: Option[String]) {
     val entityPath = sanitizedName match {
       case Some(n) => entityType + "/" + n
@@ -445,7 +473,7 @@ object ConfigCommand extends Config {
     if (opts.options.has(opts.alterOpt) && names.size != types.size)
       throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
 
-    val reverse = types.size == 2 && types(0) == ConfigType.Client
+    val reverse = types.size == 2 && types.head == ConfigType.Client
     val entityTypes = if (reverse) types.reverse else types
     val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator
 
@@ -483,7 +511,7 @@ object ConfigCommand extends Config {
       .ofType(classOf[String])
     val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
     val describeOpt = parser.accepts("describe", "List configs for the given entity.")
-    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)")
+    val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)")
             .withRequiredArg
             .ofType(classOf[String])
     val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)")
@@ -514,36 +542,58 @@ object ConfigCommand extends Config {
       val actions = Seq(alterOpt, describeOpt).count(options.has _)
       if(actions != 1)
         CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
-
       // check required args
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
+
       val entityTypeVals = options.valuesOf(entityType).asScala
+      val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
+        (BrokerSupportedConfigTypes, "--bootstrap-server")
+      else
+        (ConfigType.all, "--zookeeper")
+
+      entityTypeVals.foreach(entityTypeVal =>
+        if (!allowedEntityTypes.contains(entityTypeVal))
+          throw new IllegalArgumentException(s"Invalid entity-type $entityTypeVal, --entity-type must be one of ${allowedEntityTypes.mkString(",")} with the $connectOptString argument")
+      )
+      if (entityTypeVals.isEmpty)
+        throw new IllegalArgumentException("At least one --entity-type must be specified")
+      else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
+        throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together")
 
-      if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt))
+      if (!options.has(bootstrapServerOpt) && !options.has(zkConnectOpt))
+        throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified")
+      else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt))
         throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
+      else if (options.has(bootstrapServerOpt) && !options.has(entityName) && !options.has(entityDefault))
+        throw new IllegalArgumentException(s"At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
+
+      if (options.has(entityName) && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
+        val brokerId = options.valueOf(entityName)
+        try brokerId.toInt catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid integer broker id , but it is: $brokerId")
+        }
+      }
+
       if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User))
         CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
-      if(options.has(alterOpt)) {
+
+      if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !options.has(entityName))
+        throw new IllegalArgumentException(s"--entity-name must be specified with --describe of ${entityTypeVals.mkString(",")}")
+
+      if (options.has(alterOpt)) {
         if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) {
           if (!options.has(entityName) && !options.has(entityDefault))
             throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers")
         } else if (!options.has(entityName))
-            throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}")
+          throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals.mkString(",")}")
 
         val isAddConfigPresent: Boolean = options.has(addConfig)
         val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
         if(! isAddConfigPresent && ! isDeleteConfigPresent)
           throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
       }
-      entityTypeVals.foreach(entityTypeVal =>
-        if (!ConfigType.all.contains(entityTypeVal))
-          throw new IllegalArgumentException(s"Invalid entity-type ${entityTypeVal}, --entity-type must be one of ${ConfigType.all}")
-      )
-      if (entityTypeVals.isEmpty)
-        throw new IllegalArgumentException("At least one --entity-type must be specified")
-      else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
-        throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together")
     }
   }
 }
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 1ed55fb..1daaeec 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -21,13 +21,14 @@ import java.util.{Collections, Properties}
 import kafka.admin.{AdminOperationException, AdminUtils}
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
+import kafka.utils.Log4jController
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.AlterConfigOp
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
@@ -347,8 +348,16 @@ class AdminManager(val config: KafkaConfig,
               createResponseConfig(allConfigs(config),
                 createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
             else
-              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name")
+              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}")
 
+          case ConfigResource.Type.BROKER_LOGGER =>
+            if (resource.name == null || resource.name.isEmpty)
+              throw new InvalidRequestException("Broker id must not be empty")
+            else if (resourceNameToBrokerId(resource.name) != config.brokerId)
+              throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.name}")
+            else
+              createResponseConfig(Log4jController.loggers,
+                (name, value) => new DescribeConfigsResponse.ConfigEntry(name, value.toString, ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, false, false, List.empty.asJava))
           case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
         }
         resource -> resourceConfig
@@ -428,13 +437,24 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
+  private def alterLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = {
+    alterConfigOps.foreach { alterConfigOp =>
+      val loggerName = alterConfigOp.configEntry().name()
+      val logLevel = alterConfigOp.configEntry().value()
+      alterConfigOp.opType() match {
+        case OpType.SET => Log4jController.logLevel(loggerName, logLevel)
+        case OpType.DELETE => Log4jController.unsetLogLevel(loggerName)
+      }
+    }
+  }
+
   private def getBrokerId(resource: ConfigResource) = {
     if (resource.name == null || resource.name.isEmpty)
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
       if (id != this.config.brokerId)
-        throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
+        throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received ${resource.name}")
       Some(id)
     }
   }
@@ -451,7 +471,7 @@ class AdminManager(val config: KafkaConfig,
   def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, alterConfigOps) =>
       try {
-        //throw InvalidRequestException if any duplicate keys
+        // throw InvalidRequestException if any duplicate keys
         val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
           .mapValues(_.size).filter(_._2 > 1).keys.toSet
         if (duplicateKeys.nonEmpty)
@@ -475,6 +495,14 @@ class AdminManager(val config: KafkaConfig,
             val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
             prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
             alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
+
+          case ConfigResource.Type.BROKER_LOGGER =>
+            getBrokerId(resource)
+            validateLogLevelConfigs(alterConfigOps)
+
+            if (!validateOnly)
+              alterLogLevelConfigs(alterConfigOps)
+            resource -> ApiError.NONE
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
         }
@@ -495,6 +523,35 @@ class AdminManager(val config: KafkaConfig,
     }.toMap
   }
 
+  private def validateLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = {
+    def validateLoggerNameExists(loggerName: String): Unit = {
+      if (!Log4jController.loggerExists(loggerName))
+        throw new ConfigException(s"Logger $loggerName does not exist!")
+    }
+
+    alterConfigOps.foreach { alterConfigOp =>
+      val loggerName = alterConfigOp.configEntry().name()
+      alterConfigOp.opType() match {
+        case OpType.SET =>
+          validateLoggerNameExists(loggerName)
+          val logLevel = alterConfigOp.configEntry().value()
+          if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
+            val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
+            throw new ConfigException(
+              s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
+              s"Valid log levels are $validLevelsStr"
+            )
+          }
+        case OpType.DELETE =>
+          validateLoggerNameExists(loggerName)
+          if (loggerName == Log4jController.ROOT_LOGGER)
+            throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
+        case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
+        case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
+      }
+    }
+  }
+
   private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
 
     def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
@@ -512,14 +569,14 @@ class AdminManager(val config: KafkaConfig,
           if (!listType(alterConfigOp.configEntry().name(), configKeys))
             throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}")
           val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
-          val newValueList =  oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
+          val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
           configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
         }
         case OpType.SUBTRACT => {
           if (!listType(alterConfigOp.configEntry().name(), configKeys))
             throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}")
           val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
-          val newValueList =  oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
+          val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
           configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
         }
       }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3ec6b23..a88cd92 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2288,6 +2288,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val alterConfigsRequest = request.body[AlterConfigsRequest]
     val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) =>
       resource.`type` match {
+        case ConfigResource.Type.BROKER_LOGGER =>
+          throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
         case ConfigResource.Type.TOPIC =>
@@ -2331,7 +2333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = {
     val error = resource.`type` match {
-      case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
+      case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
       case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
       case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
     }
@@ -2349,7 +2351,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
       resource.`type` match {
-        case ConfigResource.Type.BROKER =>
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
           authorize(request.session, AlterConfigs, Resource.ClusterResource)
         case ConfigResource.Type.TOPIC =>
           authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
@@ -2370,7 +2372,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
     val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
       resource.`type` match {
-        case ConfigResource.Type.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
         case ConfigResource.Type.TOPIC =>
           authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL))
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala
index 95d0733..ba0649c 100755
--- a/core/src/main/scala/kafka/utils/Log4jController.scala
+++ b/core/src/main/scala/kafka/utils/Log4jController.scala
@@ -22,69 +22,95 @@ import java.util.Locale
 
 import org.apache.log4j.{Level, LogManager, Logger}
 
+import scala.collection.mutable
+import scala.collection.JavaConverters._
 
-/**
- * An MBean that allows the user to dynamically alter log4j levels at runtime.
- * The companion object contains the singleton instance of this class and
- * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
- * of the companion object.
- */
-private class Log4jController extends Log4jControllerMBean {
 
-  def getLoggers = {
-    val lst = new util.ArrayList[String]()
-    lst.add("root=" + existingLogger("root").getLevel.toString)
+object Log4jController {
+  val ROOT_LOGGER = "root"
+
+  /**
+    * Returns a map of the log4j loggers and their assigned log level.
+    * If a logger does not have a log level assigned, we return the root logger's log level
+    */
+  def loggers: mutable.Map[String, String] = {
+    val logs = new mutable.HashMap[String, String]()
+    val rootLoggerLvl = existingLogger(ROOT_LOGGER).getLevel.toString
+    logs.put(ROOT_LOGGER, rootLoggerLvl)
+
     val loggers = LogManager.getCurrentLoggers
     while (loggers.hasMoreElements) {
       val logger = loggers.nextElement().asInstanceOf[Logger]
       if (logger != null) {
-        val level =  if (logger != null) logger.getLevel else null
-        lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
+        val level = if (logger.getLevel != null) logger.getLevel.toString else rootLoggerLvl
+        logs.put(logger.getName, level)
       }
     }
-    lst
+    logs
   }
 
+  /**
+    * Sets the log level of a particular logger
+    */
+  def logLevel(loggerName: String, logLevel: String): Boolean = {
+    val log = existingLogger(loggerName)
+    if (!loggerName.trim.isEmpty && !logLevel.trim.isEmpty && log != null) {
+      log.setLevel(Level.toLevel(logLevel.toUpperCase(Locale.ROOT)))
+      true
+    }
+    else false
+  }
 
-  private def newLogger(loggerName: String) =
-    if (loggerName == "root")
-      LogManager.getRootLogger
-    else LogManager.getLogger(loggerName)
+  def unsetLogLevel(loggerName: String): Boolean = {
+    val log = existingLogger(loggerName)
+    if (!loggerName.trim.isEmpty && log != null) {
+      log.setLevel(null)
+      true
+    }
+    else false
+  }
 
+  def loggerExists(loggerName: String): Boolean = existingLogger(loggerName) != null
 
   private def existingLogger(loggerName: String) =
-    if (loggerName == "root")
+    if (loggerName == ROOT_LOGGER)
       LogManager.getRootLogger
     else LogManager.exists(loggerName)
+}
 
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
+ * of the companion object.
+ */
+class Log4jController extends Log4jControllerMBean {
 
-  def getLogLevel(loggerName: String) = {
-    val log = existingLogger(loggerName)
+  def getLoggers: util.List[String] = {
+    Log4jController.loggers.map {
+      case (logger, level) => s"$logger=$level"
+    }.toList.asJava
+  }
+
+
+  def getLogLevel(loggerName: String): String = {
+    val log = Log4jController.existingLogger(loggerName)
     if (log != null) {
       val level = log.getLevel
       if (level != null)
         log.getLevel.toString
-      else "Null log level."
+      else
+        Log4jController.existingLogger(Log4jController.ROOT_LOGGER).getLevel.toString
     }
     else "No such logger."
   }
 
-
-  def setLogLevel(loggerName: String, level: String) = {
-    val log = newLogger(loggerName)
-    if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
-      log.setLevel(Level.toLevel(level.toUpperCase(Locale.ROOT)))
-      true
-    }
-    else false
-  }
-
+  def setLogLevel(loggerName: String, level: String): Boolean = Log4jController.logLevel(loggerName, level)
 }
 
 
-private trait Log4jControllerMBean {
+trait Log4jControllerMBean {
   def getLoggers: java.util.List[String]
   def getLogLevel(logger: String): String
   def setLogLevel(logger: String, level: String): Boolean
 }
-
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 7f04de1..ff8e379 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -24,12 +24,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.{Collections, Properties}
 import java.{time, util}
+
 import kafka.log.LogConfig
 import kafka.security.auth.{Cluster, Group, Topic}
 import kafka.server.{Defaults, KafkaConfig, KafkaServer}
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils._
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Log4jController, Logging, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -40,7 +41,7 @@ import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.TopicPartitionReplica
 import org.apache.kafka.common.acl._
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
@@ -49,6 +50,7 @@ import org.junit.Assert._
 import org.junit.rules.Timeout
 import org.junit.{After, Before, Rule, Test}
 import org.scalatest.Assertions.intercept
+
 import scala.collection.JavaConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
@@ -68,6 +70,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   def globalTimeout = Timeout.millis(120000)
 
   var client: Admin = null
+  var brokerLoggerConfigResource: ConfigResource = null
+  var changedBrokerLoggers = scala.collection.mutable.Set[String]()
 
   val topic = "topic"
   val partition = 0
@@ -77,10 +81,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   override def setUp(): Unit = {
     super.setUp
     TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    brokerLoggerConfigResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
   }
 
   @After
   override def tearDown(): Unit = {
+    teardownBrokerLoggers()
     if (client != null)
       Utils.closeQuietly(client, "AdminClient")
     super.tearDown()
@@ -1819,6 +1825,225 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       classOf[InvalidTopicException])
     client.close()
   }
+
+  @Test
+  def testDescribeConfigsForLog4jLogLevels(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val loggerConfig = describeBrokerLoggers()
+    val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
+    val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica")
+    assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger
+    assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name())
+    assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source())
+    assertEquals(false, logCleanerLogLevelConfig.isReadOnly)
+    assertEquals(false, logCleanerLogLevelConfig.isSensitive)
+    assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
+  }
+
+  @Test
+  def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val initialLoggerConfig = describeBrokerLoggers()
+    val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()
+    assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value())
+    assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value())
+    assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value())
+
+    val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL
+    val alterRootLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+    // Test validateOnly does not change anything
+    alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true)
+    val validatedLoggerConfig = describeBrokerLoggers()
+    assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value())
+    assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value())
+    assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value())
+    assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+    // test that we can change them and unset loggers still use the root's log level
+    alterBrokerLoggers(alterRootLoggerEntry)
+    val changedRootLoggerConfig = describeBrokerLoggers()
+    assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value())
+    assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value())
+    assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value())
+    assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+    // alter the ZK client's logger so we can later test resetting it
+    val alterZKLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+    alterBrokerLoggers(alterZKLoggerEntry)
+    val changedZKLoggerConfig = describeBrokerLoggers()
+    assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+    // properly test various set operations and one delete
+    val alterLogLevelsEntries = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level
+    ).asJavaCollection
+    alterBrokerLoggers(alterLogLevelsEntries)
+    val alteredLoggerConfig = describeBrokerLoggers()
+    assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value())
+    assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value())
+    assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value())
+    assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+  }
+
+  /**
+    * 1. Assume ROOT logger == TRACE
+    * 2. Change kafka.controller.KafkaController logger to INFO
+    * 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE)
+    * 4. Change ROOT logger to ERROR
+    * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the curent root logger level)
+    */
+  @Test
+  def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = {
+    client = AdminClient.create(createConfig())
+    // step 1 - configure root logger
+    val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
+    val alterRootLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+    alterBrokerLoggers(alterRootLoggerEntry)
+    val initialLoggerConfig = describeBrokerLoggers()
+    assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value())
+
+    // step 2 - change KafkaController logger to INFO
+    val alterControllerLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+    alterBrokerLoggers(alterControllerLoggerEntry)
+    val changedControllerLoggerConfig = describeBrokerLoggers()
+    assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value())
+
+    // step 3 - unset KafkaController logger
+    val deleteControllerLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+    alterBrokerLoggers(deleteControllerLoggerEntry)
+    val deletedControllerLoggerConfig = describeBrokerLoggers()
+    assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value())
+
+    val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL
+    val newAlterRootLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+    alterBrokerLoggers(newAlterRootLoggerEntry)
+    val newRootLoggerConfig = describeBrokerLoggers()
+    assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+    assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value())
+  }
+
+  @Test
+  def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = {
+    client = AdminClient.create(createConfig())
+    val deleteRootLoggerEntry = Seq(
+      new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+
+    assertTrue(intercept[ExecutionException](alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException])
+  }
+
+  @Test
+  def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = {
+    client = AdminClient.create(createConfig())
+    val validLoggerName = "kafka.server.KafkaRequestHandler"
+    val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName)
+    def assertLogLevelDidNotChange(): Unit = {
+      assertEquals(
+        expectedValidLoggerLogLevel,
+        describeBrokerLoggers().get(validLoggerName)
+      )
+    }
+
+    val appendLogLevelEntries = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+      new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported
+    ).asJavaCollection
+    assertTrue(intercept[ExecutionException](alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+    assertLogLevelDidNotChange()
+
+    val subtractLogLevelEntries = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+      new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported
+    ).asJavaCollection
+    assertTrue(intercept[ExecutionException](alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+    assertLogLevelDidNotChange()
+
+    val invalidLogLevelLogLevelEntries = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+      new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level
+    ).asJavaCollection
+    assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+    assertLogLevelDidNotChange()
+
+    val invalidLoggerNameLogLevelEntries = Seq(
+      new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+      new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported
+    ).asJavaCollection
+    assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+    assertLogLevelDidNotChange()
+  }
+
+  /**
+    * The AlterConfigs API is deprecated and should not support altering log levels
+    */
+  @Test
+  def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val alterLogLevelsEntries = Seq(
+      new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL)
+    ).asJavaCollection
+    val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava)
+    assertTrue(intercept[ExecutionException](alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException])
+  }
+
+  def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = {
+    if (!validateOnly) {
+      for (entry <- entries.asScala)
+        changedBrokerLoggers.add(entry.configEntry().name())
+    }
+
+    client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly))
+      .values.get(brokerLoggerConfigResource).get()
+  }
+
+  def describeBrokerLoggers(): Config =
+    client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get()
+
+  /**
+    * Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes.
+    * We need to clean up the changes done while testing.
+    */
+  def teardownBrokerLoggers(): Unit = {
+    if (changedBrokerLoggers.nonEmpty) {
+      val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name().equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet
+      val unsetBrokerLoggersEntries = changedBrokerLoggers
+        .intersect(validLoggers)
+        .map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) }
+        .asJavaCollection
+
+      // ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value
+      alterBrokerLoggers(List(
+        new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET)
+      ).asJavaCollection)
+      alterBrokerLoggers(unsetBrokerLoggersEntries)
+
+      changedBrokerLoggers.clear()
+    }
+  }
 }
 
 object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 8f3f24f..387a7f9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.message.ControlledShutdownRequestData
@@ -101,6 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
   val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
   val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+  val clusterAlterConfigsAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs)))
   val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
   val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
   val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
@@ -215,8 +216,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
     ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
     ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
-    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
-      IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
+    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => {
+      val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))
+      if (topicResourceError == null)
+        IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error
+      else
+        topicResourceError.error()
+    }),
     ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())),
     ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode()))
   )
@@ -651,6 +657,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(): Unit = {
+    val data = new IncrementalAlterConfigsRequestData
+    val alterableConfig = new AlterableConfig().setName("kafka.controller.KafkaController").
+      setValue(LogLevelConfig.DEBUG_LOG_LEVEL).setConfigOperation(AlterConfigOp.OpType.DELETE.id())
+    val alterableConfigSet = new AlterableConfigCollection
+    alterableConfigSet.add(alterableConfig)
+    data.resources().add(new AlterConfigsResource().
+      setResourceName(brokerId.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id()).
+      setConfigs(alterableConfigSet))
+    val key = ApiKeys.INCREMENTAL_ALTER_CONFIGS
+    val request = new IncrementalAlterConfigsRequest.Builder(data).build()
+
+    removeAllAcls()
+    val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+
+    val clusterAcls = clusterAlterConfigsAcl(Resource.ClusterResource)
+    addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
+  }
+
+  @Test
   def testOffsetsForLeaderEpochClusterPermission(): Unit = {
     val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
     val request = offsetsForLeaderEpochRequest
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index bd26a61..e3396bb 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, KafkaConfig}
+import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
 import kafka.utils.{Exit, Logging}
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin._
@@ -101,33 +101,44 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     testArgumentParse("brokers")
   }
 
-  def testArgumentParse(entityType: String) = {
+  @Test
+  def shouldParseArgumentsForBrokerLoggersEntityType() {
+    testArgumentParse("broker-loggers",
+      zkConfig = false)
+  }
+
+  def testArgumentParse(entityType: String, zkConfig: Boolean=true): Unit = {
+    val connectOpts = if (zkConfig)
+      ("--zookeeper", zkConnect)
+    else
+      ("--bootstrap-server", "localhost:9092")
+
     // Should parse correctly
-    var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "x",
+    var createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
       "--entity-type", entityType,
       "--describe"))
     createOpts.checkArgs()
 
     // For --alter and added config
-    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "x",
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
       "--entity-type", entityType,
       "--alter",
       "--add-config", "a=b,c=d"))
     createOpts.checkArgs()
 
     // For alter and deleted config
-    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "x",
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
       "--entity-type", entityType,
       "--alter",
       "--delete-config", "a,b,c"))
     createOpts.checkArgs()
 
     // For alter and both added, deleted config
-    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "x",
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
       "--entity-type", entityType,
       "--alter",
       "--add-config", "a=b,c=d",
@@ -143,8 +154,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     assertEquals(1, deletedProps.size)
     assertEquals("a", deletedProps.head)
 
-    createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
-      "--entity-name", "x",
+    createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+      "--entity-name", "1",
       "--entity-type", entityType,
       "--alter",
       "--add-config", "a=b,c=,d=e,f="))
@@ -165,6 +176,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldFailIfBrokerEntityTypeIsNotAnInteger(): Unit = {
+    val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"))
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
+  }
+
   @Test
   def shouldAddClientConfig(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -229,6 +247,71 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldAddBrokerLoggerConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterBrokerLoggerConfig(node, "1", "1", List(
+      new ConfigEntry("kafka.log.LogCleaner", "INFO"),
+      new ConfigEntry("kafka.server.ReplicaManager", "INFO"),
+      new ConfigEntry("kafka.server.KafkaApi", "INFO")
+    ))
+  }
+
+  @Test
+  def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = {
+    val optsList = List("--zookeeper", "localhost:9092",
+      "--entity-type", ConfigType.Broker,
+      "--describe"
+    )
+
+    new ConfigCommandOptions(optsList.toArray).checkArgs()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testNoSpecifiedEntityOptionWithDescribeBrokersInBootstrapServerIsNotAllowed(): Unit = {
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", ConfigType.Broker,
+      "--describe"
+    )
+
+    new ConfigCommandOptions(optsList.toArray).checkArgs()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+      "--entity-default",
+      "--describe"
+    )
+
+    new ConfigCommandOptions(optsList.toArray).checkArgs()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testEntityDefaultOptionWithAlterBrokerLoggerIsNotAllowed(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+      "--entity-default",
+      "--alter",
+      "--add-config", "kafka.log.LogCleaner=DEBUG"
+    )
+
+    new ConfigCommandOptions(optsList.toArray).checkArgs()
+  }
+
+  @Test(expected = classOf[InvalidConfigurationException])
+  def shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLoggerConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    // verifyAlterBrokerLoggerConfig tries to alter kafka.log.LogCleaner, kafka.server.ReplicaManager and kafka.server.KafkaApi
+    // yet, we make it so DescribeConfigs returns only one logger, implying that kafka.server.ReplicaManager and kafka.log.LogCleaner are invalid
+    verifyAlterBrokerLoggerConfig(node, "1", "1", List(
+      new ConfigEntry("kafka.server.KafkaApi", "INFO")
+    ))
+  }
+
+  @Test
   def shouldAddDefaultBrokerDynamicConfig(): Unit = {
     val node = new Node(1, "localhost", 9092)
     verifyAlterBrokerConfig(node, "", List("--entity-default"))
@@ -274,11 +357,66 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       }
     }
     EasyMock.replay(alterResult, describeResult)
-    ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName)
+    ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigType.Broker, resourceName)
     assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap)
     EasyMock.reset(alterResult, describeResult)
   }
 
+  def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String,
+                                    describeConfigEntries: List[ConfigEntry]): Unit = {
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+      "--alter",
+      "--entity-name", entityName,
+      "--add-config", "kafka.log.LogCleaner=DEBUG",
+      "--delete-config", "kafka.server.ReplicaManager,kafka.server.KafkaApi")
+    val alterOpts = new ConfigCommandOptions(optsList.toArray)
+    var alteredConfigs = false
+
+    val resource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, resourceName)
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    future.complete(util.Collections.singletonMap(resource, new Config(describeConfigEntries.asJava)))
+    val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+    val alterFuture = new KafkaFutureImpl[Void]
+    alterFuture.complete(null)
+    val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+    EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = {
+        assertEquals(1, resources.size)
+        val resource = resources.iterator.next
+        assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
+        assertEquals(resourceName, resource.name)
+        describeResult
+      }
+
+      override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]], options: AlterConfigsOptions): AlterConfigsResult = {
+        assertEquals(1, configs.size)
+        val entry = configs.entrySet.iterator.next
+        val resource = entry.getKey
+        val alterConfigOps = entry.getValue
+        assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
+        assertEquals(3, alterConfigOps.size)
+
+        val expectedConfigOps = List(
+          new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", "DEBUG"), AlterConfigOp.OpType.SET),
+          new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
+          new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", ""), AlterConfigOp.OpType.DELETE)
+        )
+        assertEquals(expectedConfigOps, alterConfigOps.asScala.toList)
+        alteredConfigs = true
+        alterResult
+      }
+    }
+    EasyMock.replay(alterResult, describeResult)
+    ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigCommand.BrokerLoggerConfigType, resourceName)
+    assertTrue(alteredConfigs)
+    EasyMock.reset(alterResult, describeResult)
+  }
+
   @Test
   def shouldSupportCommaSeparatedValues(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,


Mime
View raw message