kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247)
Date Tue, 16 Apr 2019 23:26:46 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b1524c  KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247)
3b1524c is described below

commit 3b1524c5dfd2a94f3fb919dad0de70984963772b
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Wed Apr 17 04:56:33 2019 +0530

    KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Viktor Somogyi <viktorsomogyi@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/clients/admin/AdminClient.java    |  46 ++++++
 .../apache/kafka/clients/admin/AlterConfigOp.java  |  96 +++++++++++
 .../kafka/clients/admin/KafkaAdminClient.java      |  90 +++++++++++
 .../org/apache/kafka/common/config/ConfigDef.java  |   4 +
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractRequest.java     |   2 +
 .../kafka/common/requests/AbstractResponse.java    |   2 +
 .../requests/IncrementalAlterConfigsRequest.java   |  91 +++++++++++
 .../requests/IncrementalAlterConfigsResponse.java  |  99 ++++++++++++
 .../message/IncrementalAlterConfigsRequest.json    |  41 +++++
 .../message/IncrementalAlterConfigsResponse.json   |  36 +++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  56 +++++++
 .../kafka/clients/admin/MockAdminClient.java       |   7 +
 .../kafka/common/requests/RequestResponseTest.java |  35 ++++
 core/src/main/scala/kafka/log/LogConfig.scala      |   2 +
 .../src/main/scala/kafka/server/AdminManager.scala | 180 +++++++++++++++------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  31 ++++
 .../kafka/api/AdminClientIntegrationTest.scala     | 178 ++++++++++++++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala      |  33 +++-
 .../server/DynamicBrokerReconfigurationTest.scala  |  16 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  28 +++-
 23 files changed, 1023 insertions(+), 68 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ce2706d..ca103a2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -43,7 +43,7 @@
               files="Sender.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|AdminClient|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index b823cdc..8826f83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -368,7 +368,9 @@ public abstract class AdminClient implements AutoCloseable {
      * @param configs         The resources with their configs (topic is the only resource type with configs that can
      *                        be updated currently)
      * @return                The AlterConfigsResult
+     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}.
      */
+    @Deprecated
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
         return alterConfigs(configs, new AlterConfigsOptions());
     }
@@ -385,10 +387,54 @@ public abstract class AdminClient implements AutoCloseable {
      *                        be updated currently)
      * @param options         The options to use when describing configs
      * @return                The AlterConfigsResult
+     * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}.
      */
+    @Deprecated
     public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
 
     /**
+     * Incrementally updates the configuration for the specified resources with default options.
+     *
+     * This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options.
+     * See the overload for more details.*
+     *
+     * This operation is supported by brokers with version 2.3.0 or higher.
+     *
+     * @param configs         The resources with their configs
+     * @return                The IncrementalAlterConfigsResult
+     */
+    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+        return incrementalAlterConfigs(configs, new AlterConfigsOptions());
+    }
+
+
+    /**
+     * Incrementally update the configuration for the specified resources.
+     *
+     * Updates are not transactional so they may succeed for some resources while fail for others. The configs for
+     * a particular resource are updated atomically.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code IncrementalAlterConfigsResult}:</p>
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   if the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TopicAuthorizationException}
+     *   if the authenticated user didn't have alter access to the Topic.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   if the request details are invalid. e.g., a configuration key was specified more than once for a resource</li>
+     * </ul>*
+     *
+     * This operation is supported by brokers with version 2.3.0 or higher.
+     *
+     * @param configs         The resources with their configs
+     * @param options         The options to use when altering configs
+     * @return                The IncrementalAlterConfigsResult
+     */
+    public abstract AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource,
+            Collection<AlterConfigOp>> configs, AlterConfigsOptions options);
+
+    /**
      * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result
      * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the
      * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java
new file mode 100644
index 0000000..367c842
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigOp.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A class representing a alter configuration entry containing name, value and operation type.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterConfigOp {
+
+    public enum OpType {
+        SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3);
+
+        private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap(
+                Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity()))
+        );
+
+        private final byte id;
+
+        OpType(final byte id) {
+            this.id = id;
+        }
+
+        public byte id() {
+            return id;
+        }
+
+        public static OpType forId(final byte id) {
+            return OP_TYPES.get(id);
+        }
+    }
+
+    private final ConfigEntry configEntry;
+    private final OpType opType;
+
+    public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
+        this.configEntry = configEntry;
+        this.opType =  operationType;
+    }
+
+    public ConfigEntry configEntry() {
+        return configEntry;
+    };
+
+    public OpType opType() {
+        return opType;
+    };
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final AlterConfigOp that = (AlterConfigOp) o;
+        return opType == that.opType &&
+                Objects.equals(configEntry, that.configEntry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(opType, configEntry);
+    }
+
+    @Override
+    public String toString() {
+        return "AlterConfigOp{" +
+                "opType=" + opType +
+                ", configEntry=" + configEntry +
+                '}';
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 336597f..23d7fd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,10 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -121,6 +125,8 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -1885,6 +1891,7 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
+    @Deprecated
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
         final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
         // We must make a separate AlterConfigs request for every BROKER resource we want to alter
@@ -1949,6 +1956,89 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
+    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                 final AlterConfigsOptions options) {
+        final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
+        // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+        // and send the request to that specific broker. Other resources are grouped together into
+        // a single request that may be sent to any broker.
+        final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
+
+        for (ConfigResource resource : configs.keySet()) {
+            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+                NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
+                allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
+            } else
+                unifiedRequestResources.add(resource);
+        }
+        if (!unifiedRequestResources.isEmpty())
+            allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
+
+        return new AlterConfigsResult(new HashMap<>(allFutures));
+    }
+
+    private Map<ConfigResource, KafkaFutureImpl<Void>> incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                    final AlterConfigsOptions options,
+                                                                    Collection<ConfigResource> resources,
+                                                                    NodeProvider nodeProvider) {
+        final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        for (ConfigResource resource : resources)
+            futures.put(resource, new KafkaFutureImpl<>());
+
+        final long now = time.milliseconds();
+        runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new IncrementalAlterConfigsRequest.Builder(
+                        toIncrementalAlterConfigsRequestData(resources, configs, options.shouldValidateOnly()));
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse;
+                Map<ConfigResource, ApiError> errors = IncrementalAlterConfigsResponse.fromResponseData(response.data());
+                for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    ApiException exception = errors.get(entry.getKey()).exception();
+                    if (exception != null) {
+                        future.completeExceptionally(exception);
+                    } else {
+                        future.complete(null);
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
+        return futures;
+    }
+
+    private  IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection<ConfigResource> resources,
+                                                                   final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                   final boolean validateOnly) {
+        IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData();
+        requestData.setValidateOnly(validateOnly);
+        for (ConfigResource resource : resources) {
+            AlterableConfigSet alterableConfigSet = new AlterableConfigSet();
+            for (AlterConfigOp configEntry : configs.get(resource))
+                alterableConfigSet.add(new AlterableConfig().
+                        setName(configEntry.configEntry().name()).
+                        setValue(configEntry.configEntry().value()).
+                        setConfigOperation(configEntry.opType().id()));
+
+            AlterConfigsResource alterConfigsResource = new AlterConfigsResource();
+            alterConfigsResource.setResourceType(resource.type().id()).
+                    setResourceName(resource.name()).setConfigs(alterableConfigSet);
+            requestData.resources().add(alterConfigsResource);
+        }
+        return requestData;
+    }
+
+    @Override
     public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options) {
         final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new HashMap<>(replicaAssignment.size());
 
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 1259882..9f28b56 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1078,6 +1078,10 @@ public class ConfigDef {
         public boolean hasDefault() {
             return !NO_DEFAULT_VALUE.equals(this.defaultValue);
         }
+
+        public Type type() {
+            return type;
+        }
     }
 
     protected List<String> headers() {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 33d6736..5391135 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -189,7 +191,9 @@ public enum ApiKeys {
     DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
     DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
     ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS,
-            ElectPreferredLeadersResponseData.SCHEMAS);
+            ElectPreferredLeadersResponseData.SCHEMAS),
+    INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
+                              IncrementalAlterConfigsResponseData.SCHEMAS);
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index c069bc9..c199f8e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -231,6 +231,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return new DeleteGroupsRequest(struct, apiVersion);
             case ELECT_PREFERRED_LEADERS:
                 return new ElectPreferredLeadersRequest(struct, apiVersion);
+            case INCREMENTAL_ALTER_CONFIGS:
+                return new IncrementalAlterConfigsRequest(struct, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 50ae0b5..c21fa2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -158,6 +158,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DeleteGroupsResponse(struct);
             case ELECT_PREFERRED_LEADERS:
                 return new ElectPreferredLeadersResponse(struct, version);
+            case INCREMENTAL_ALTER_CONFIGS:
+                return new IncrementalAlterConfigsResponse(struct, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
new file mode 100644
index 0000000..3a87cdb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class IncrementalAlterConfigsRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<IncrementalAlterConfigsRequest> {
+        private final IncrementalAlterConfigsRequestData data;
+
+        public Builder(IncrementalAlterConfigsRequestData data) {
+            super(ApiKeys.INCREMENTAL_ALTER_CONFIGS);
+            this.data = data;
+        }
+
+        @Override
+        public IncrementalAlterConfigsRequest build(short version) {
+            return new IncrementalAlterConfigsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final IncrementalAlterConfigsRequestData data;
+    private final short version;
+
+    private IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData data, short version) {
+        super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
+        this.data = data;
+        this.version = version;
+    }
+
+    IncrementalAlterConfigsRequest(final Struct struct, final short version) {
+        super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
+        this.data = new IncrementalAlterConfigsRequestData(struct, version);
+        this.version = version;
+    }
+
+    public static IncrementalAlterConfigsRequest parse(ByteBuffer buffer, short version) {
+        return new IncrementalAlterConfigsRequest(ApiKeys.INCREMENTAL_ALTER_CONFIGS.parseRequest(version, buffer), version);
+    }
+
+    public IncrementalAlterConfigsRequestData data() {
+        return data;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(final int throttleTimeMs, final Throwable e) {
+        IncrementalAlterConfigsResponseData response = new IncrementalAlterConfigsResponseData();
+        ApiError apiError = ApiError.fromThrowable(e);
+        for (AlterConfigsResource resource : data.resources()) {
+            response.responses().add(new AlterConfigsResourceResult()
+                    .setResourceName(resource.resourceName())
+                    .setResourceType(resource.resourceType())
+                    .setErrorCode(apiError.error().code())
+                    .setErrorMessage(apiError.message()));
+        }
+        return new IncrementalAlterConfigsResponse(response);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
new file mode 100644
index 0000000..1e5aea1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IncrementalAlterConfigsResponse extends AbstractResponse {
+
+    public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
+                                                                     final Map<ConfigResource, ApiError> results) {
+        IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
+        responseData.setThrottleTimeMs(requestThrottleMs);
+        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
+            responseData.responses().add(new AlterConfigsResourceResult().
+                    setResourceName(entry.getKey().name()).
+                    setResourceType(entry.getKey().type().id()).
+                    setErrorCode(entry.getValue().error().code()).
+                    setErrorMessage(entry.getValue().message()));
+        }
+        return responseData;
+    }
+
+    public static Map<ConfigResource, ApiError> fromResponseData(final IncrementalAlterConfigsResponseData data) {
+        Map<ConfigResource, ApiError> map = new HashMap<>();
+        for (AlterConfigsResourceResult result : data.responses()) {
+            map.put(new ConfigResource(ConfigResource.Type.forId(result.resourceType()), result.resourceName()),
+                    new ApiError(Errors.forCode(result.errorCode()), result.errorMessage()));
+        }
+        return map;
+    }
+
+    private final IncrementalAlterConfigsResponseData data;
+
+    public IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponseData data) {
+        this.data = data;
+    }
+
+    public IncrementalAlterConfigsResponse(final Struct struct, final short version) {
+        this.data = new IncrementalAlterConfigsResponseData(struct, version);
+    }
+
+    public IncrementalAlterConfigsResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        for (AlterConfigsResourceResult result : data.responses()) {
+            Errors error = Errors.forCode(result.errorCode());
+            counts.put(error, counts.getOrDefault(error, 0) + 1);
+        }
+        return counts;
+    }
+
+    @Override
+    protected Struct toStruct(final short version) {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public boolean shouldClientThrottle(short version) {
+        return version >= 0;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    public static IncrementalAlterConfigsResponse parse(ByteBuffer buffer, short version) {
+        return new IncrementalAlterConfigsResponse(
+                ApiKeys.INCREMENTAL_ALTER_CONFIGS.responseSchema(version).read(buffer), version);
+    }
+}
diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
new file mode 100644
index 0000000..d808c04
--- /dev/null
+++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsRequest.json
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 44,
+  "type": "request",
+  "name": "IncrementalAlterConfigsRequest",
+  "validVersions": "0",
+  "fields": [
+    { "name": "Resources", "type": "[]AlterConfigsResource", "versions": "0+",
+      "about": "The incremental updates for each resource.", "fields": [
+      { "name": "ResourceType", "type": "int8", "versions": "0+", "mapKey": true,
+        "about": "The resource type." },
+      { "name": "ResourceName", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The resource name." },
+      { "name": "Configs", "type": "[]AlterableConfig", "versions": "0+",
+        "about": "The configurations.",  "fields": [
+        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+          "about": "The configuration key name." },
+        { "name": "ConfigOperation", "type": "int8", "versions": "0+", "mapKey": true,
+          "about": "The type (Set, Delete, Append, Subtract) of operation." },
+        { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
+          "about": "The value to set for the configuration key."}
+      ]}
+    ]},
+    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
+      "about": "True if we should validate the request, but not change the configurations."}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json b/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json
new file mode 100644
index 0000000..71aa997
--- /dev/null
+++ b/clients/src/main/resources/common/message/IncrementalAlterConfigsResponse.json
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 44,
+  "type": "response",
+  "name": "IncrementalAlterConfigsResponse",
+  "validVersions": "0",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "responses", "type": "[]AlterConfigsResourceResult", "versions": "0+",
+      "about": "The responses for each resource.", "fields": [
+      { "name": "ErrorCode", "type": "int16", "versions": "0+",
+        "about": "The resource error code." },
+      { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+",
+        "about": "The resource error message, or null if there was no error." },
+      { "name": "ResourceType", "type": "int8", "versions": "0+",
+        "about": "The resource type." },
+      { "name": "ResourceName", "type": "string", "versions": "0+",
+        "about": "The resource name." }
+    ]}
+  ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 220fc50..1367b94 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -59,6 +59,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -78,6 +80,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.ElectPreferredLeadersResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -1203,6 +1206,59 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testIncrementalAlterConfigs()  throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            //test error scenarios
+            IncrementalAlterConfigsResponseData responseData =  new IncrementalAlterConfigsResponseData();
+            responseData.responses().add(new AlterConfigsResourceResult()
+                    .setResourceName("")
+                    .setResourceType(ConfigResource.Type.BROKER.id())
+                    .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
+                    .setErrorMessage("authorization error"));
+
+            responseData.responses().add(new AlterConfigsResourceResult()
+                    .setResourceName("topic1")
+                    .setResourceType(ConfigResource.Type.TOPIC.id())
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("Config value append is not allowed for config"));
+
+            env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData));
+
+            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "");
+            ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic1");
+
+            AlterConfigOp alterConfigOp1 = new AlterConfigOp(
+                    new ConfigEntry("log.segment.bytes", "1073741"),
+                    AlterConfigOp.OpType.SET);
+
+            AlterConfigOp alterConfigOp2 = new AlterConfigOp(
+                    new ConfigEntry("compression.type", "gzip"),
+                    AlterConfigOp.OpType.APPEND);
+
+            final Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
+            configs.put(brokerResource, Collections.singletonList(alterConfigOp1));
+            configs.put(topicResource, Collections.singletonList(alterConfigOp2));
+
+            AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs);
+            TestUtils.assertFutureError(result.values().get(brokerResource), ClusterAuthorizationException.class);
+            TestUtils.assertFutureError(result.values().get(topicResource), InvalidRequestException.class);
+
+            // Test a call where there are no errors.
+            responseData =  new IncrementalAlterConfigsResponseData();
+            responseData.responses().add(new AlterConfigsResourceResult()
+                    .setResourceName("")
+                    .setResourceType(ConfigResource.Type.BROKER.id())
+                    .setErrorCode(Errors.NONE.code())
+                    .setErrorMessage(ApiError.NONE.message()));
+
+            env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData));
+            env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, asList(alterConfigOp1))).all().get();
+        }
+    }
+
     @SafeVarargs
     private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
         for (T element : elements) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b669a32..9709372 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -375,11 +375,18 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    @Deprecated
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
     @Override
+    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                      AlterConfigsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ca695c7..e595195 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -62,6 +62,12 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -338,6 +344,9 @@ public class RequestResponseTest {
         checkRequest(createElectPreferredLeadersRequestNullPartitions());
         checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException());
         checkResponse(createElectPreferredLeadersResponse(), 0);
+        checkRequest(createIncrementalAlterConfigsRequest());
+        checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException());
+        checkResponse(createIncrementalAlterConfigsResponse(), 0);
     }
 
     @Test
@@ -1477,4 +1486,30 @@ public class RequestResponseTest {
         return new ElectPreferredLeadersResponse(data);
     }
 
+    private IncrementalAlterConfigsRequest createIncrementalAlterConfigsRequest() {
+        IncrementalAlterConfigsRequestData data = new IncrementalAlterConfigsRequestData();
+        AlterableConfig alterableConfig = new AlterableConfig()
+                .setName("retention.ms")
+                .setConfigOperation((byte) 0)
+                .setValue("100");
+        AlterableConfigSet alterableConfigs = new AlterableConfigSet();
+        alterableConfigs.add(alterableConfig);
+
+        data.resources().add(new AlterConfigsResource()
+                .setResourceName("testtopic")
+                .setResourceType(ResourceType.TOPIC.code())
+                .setConfigs(alterableConfigs));
+        return new IncrementalAlterConfigsRequest.Builder(data).build((short) 0);
+    }
+
+    private IncrementalAlterConfigsResponse createIncrementalAlterConfigsResponse() {
+        IncrementalAlterConfigsResponseData data = new IncrementalAlterConfigsResponseData();
+
+        data.responses().add(new AlterConfigsResourceResult()
+                .setResourceName("testtopic")
+                .setResourceType(ResourceType.TOPIC.code())
+                .setErrorCode(Errors.INVALID_REQUEST.code())
+                .setErrorMessage("Duplicate Keys"));
+        return new IncrementalAlterConfigsResponse(data);
+    }
 }
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index ab58949..c1e5c62 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -297,6 +297,8 @@ object LogConfig {
         throw new InvalidConfigurationException(s"Unknown topic config name: $name")
   }
 
+  private[kafka] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
+
   /**
    * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
    */
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 0cdaad6..d424700 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -24,6 +24,9 @@ import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
@@ -38,7 +41,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 
-import scala.collection.{mutable, _}
+import scala.collection.{Map, mutable, _}
 import scala.collection.JavaConverters._
 
 class AdminManager(val config: KafkaConfig,
@@ -364,59 +367,113 @@ class AdminManager(val config: KafkaConfig,
   def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
     configs.map { case (resource, config) =>
 
-      def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = {
-        alterConfigPolicy match {
-          case Some(policy) =>
-            val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
-            policy.validate(new AlterConfigPolicy.RequestMetadata(
-              new ConfigResource(resourceType, resource.name), configEntriesMap.asJava))
-          case None =>
-        }
-      }
       try {
+        val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
+
+        val configProps = new Properties
+        config.entries.asScala.foreach { configEntry =>
+          configProps.setProperty(configEntry.name, configEntry.value)
+        }
+
         resource.`type` match {
-          case ConfigResource.Type.TOPIC =>
-            val topic = resource.name
+          case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
+          case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
+          case resourceType =>
+            throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
+        }
+      } catch {
+        case e @ (_: ConfigException | _: IllegalArgumentException) =>
+          val message = s"Invalid config value for resource $resource: ${e.getMessage}"
+          info(message)
+          resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
+        case e: Throwable =>
+          // Log client errors at a lower level than unexpected exceptions
+          val message = s"Error processing alter configs request for resource $resource, config $config"
+          if (e.isInstanceOf[ApiException])
+            info(message, e)
+          else
+            error(message, e)
+          resource -> ApiError.fromThrowable(e)
+      }
+    }.toMap
+  }
 
-            val properties = new Properties
-            config.entries.asScala.foreach { configEntry =>
-              properties.setProperty(configEntry.name, configEntry.value)
-            }
+  private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
+                                configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+    val topic = resource.name
+    adminZkClient.validateTopicConfig(topic, configProps)
+    validateConfigPolicy(resource, configEntriesMap)
+    if (!validateOnly) {
+      info(s"Updating topic $topic with new configuration $config")
+      adminZkClient.changeTopicConfig(topic, configProps)
+    }
 
-            adminZkClient.validateTopicConfig(topic, properties)
-            validateConfigPolicy(ConfigResource.Type.TOPIC)
-            if (!validateOnly) {
-              info(s"Updating topic $topic with new configuration $config")
-              adminZkClient.changeTopicConfig(topic, properties)
-            }
+    resource -> ApiError.NONE
+  }
 
-            resource -> ApiError.NONE
+  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
+                                 configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+    val brokerId = getBrokerId(resource)
+    val perBrokerConfig = brokerId.nonEmpty
+    this.config.dynamicConfig.validate(configProps, perBrokerConfig)
+    validateConfigPolicy(resource, configEntriesMap)
+    if (!validateOnly) {
+      if (perBrokerConfig)
+        this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+      adminZkClient.changeBrokerConfig(brokerId,
+        this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
+    }
 
-          case ConfigResource.Type.BROKER =>
-            val brokerId = if (resource.name == null || resource.name.isEmpty)
-              None
-            else {
-              val id = resourceNameToBrokerId(resource.name)
-              if (id != this.config.brokerId)
-                throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
-              Some(id)
-            }
-            val configProps = new Properties
-            config.entries.asScala.foreach { configEntry =>
-              configProps.setProperty(configEntry.name, configEntry.value)
-            }
+    resource -> ApiError.NONE
+  }
+
+  private def getBrokerId(resource: ConfigResource) = {
+    if (resource.name == null || resource.name.isEmpty)
+      None
+    else {
+      val id = resourceNameToBrokerId(resource.name)
+      if (id != this.config.brokerId)
+        throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
+      Some(id)
+    }
+  }
+
+  private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = {
+    alterConfigPolicy match {
+      case Some(policy) =>
+        policy.validate(new AlterConfigPolicy.RequestMetadata(
+          new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava))
+      case None =>
+    }
+  }
 
+  def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
+    configs.map { case (resource, alterConfigOps) =>
+      try {
+        //throw InvalidRequestException if any duplicate keys
+        val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
+          .mapValues(_.size).filter(_._2 > 1).keys.toSet
+        if (duplicateKeys.nonEmpty)
+          throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
+
+        val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry().name(), entry.configEntry().value())).toMap
+
+        resource.`type` match {
+          case ConfigResource.Type.TOPIC =>
+            val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
+            prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys)
+            alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
+
+          case ConfigResource.Type.BROKER =>
+            val brokerId = getBrokerId(resource)
             val perBrokerConfig = brokerId.nonEmpty
-            this.config.dynamicConfig.validate(configProps, perBrokerConfig)
-            validateConfigPolicy(ConfigResource.Type.BROKER)
-            if (!validateOnly) {
-              if (perBrokerConfig)
-                this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
-              adminZkClient.changeBrokerConfig(brokerId,
-                this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
-            }
 
-            resource -> ApiError.NONE
+            val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.get.toString)
+            else adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default)
+
+            val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
+            prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
+            alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
         }
@@ -427,7 +484,7 @@ class AdminManager(val config: KafkaConfig,
           resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
         case e: Throwable =>
           // Log client errors at a lower level than unexpected exceptions
-          val message = s"Error processing alter configs request for resource $resource, config $config"
+          val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps"
           if (e.isInstanceOf[ApiException])
             info(message, e)
           else
@@ -437,6 +494,37 @@ class AdminManager(val config: KafkaConfig,
     }.toMap
   }
 
+  private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
+
+    def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
+      val configKey = configKeys(configName)
+      if (configKey == null)
+        throw new InvalidConfigurationException(s"Unknown topic config name: $configName")
+      configKey.`type` == ConfigDef.Type.LIST
+    }
+
+    alterConfigOps.foreach { alterConfigOp =>
+      alterConfigOp.opType() match {
+        case OpType.SET => configProps.setProperty(alterConfigOp.configEntry().name(), alterConfigOp.configEntry().value())
+        case OpType.DELETE => configProps.remove(alterConfigOp.configEntry().name())
+        case OpType.APPEND => {
+          if (!listType(alterConfigOp.configEntry().name(), configKeys))
+            throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}")
+          val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
+          val newValueList =  oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
+          configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
+        }
+        case OpType.SUBTRACT => {
+          if (!listType(alterConfigOp.configEntry().name(), configKeys))
+            throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}")
+          val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
+          val newValueList =  oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
+          configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
+        }
+      }
+    }
+  }
+
   def shutdown() {
     topicPurgatory.shutdown()
     CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 140fdd4..56adc9b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,6 +39,8 @@ import kafka.security.auth.{Resource, _}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -151,6 +153,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
         case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
         case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request)
+        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -2155,6 +2158,34 @@ class KafkaApis(val requestChannel: RequestChannel,
     new ApiError(error, null)
   }
 
+  def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request): Unit = {
+    val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+
+    val configs = alterConfigsRequest.data().resources().iterator().asScala.map { alterConfigResource =>
+      val configResource = new ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType()), alterConfigResource.resourceName())
+      configResource -> alterConfigResource.configs().iterator().asScala.map {
+        alterConfig => new AlterConfigOp(new ConfigEntry(alterConfig.name(), alterConfig.value()), OpType.forId(alterConfig.configOperation())) }.toList
+    }.toMap
+
+    val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
+      resource.`type` match {
+        case ConfigResource.Type.BROKER =>
+          authorize(request.session, AlterConfigs, Resource.ClusterResource)
+        case ConfigResource.Type.TOPIC =>
+          authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
+        case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
+      }
+    }
+
+    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data().validateOnly())
+    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+      resource -> configsAuthorizationApiError(request.session, resource)
+    }
+    sendResponseMaybeThrottle(request, requestThrottleMs =>
+      new IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs,
+        (authorizedResult ++ unauthorizedResult).asJava)))
+  }
+
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
     val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index dbb6213..9e35f40 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1406,6 +1406,184 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(2, currentLeader(partition1))
     assertEquals(2, currentLeader(partition2))
   }
+
+  @Test
+  def testValidIncrementalAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "incremental-alter-configs-topic-1"
+    val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    val topic1CreateConfigs = new Properties
+    topic1CreateConfigs.setProperty(LogConfig.RetentionMsProp, "60000000")
+    topic1CreateConfigs.setProperty(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1, topic1CreateConfigs)
+
+    val topic2 = "incremental-alter-configs-topic-2"
+    val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    createTopic(topic2)
+
+    // Alter topic configs
+    var topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.FlushMsProp, "1000"), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Delete), AlterConfigOp.OpType.APPEND),
+      new AlterConfigOp(new ConfigEntry(LogConfig.RetentionMsProp, ""), AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+
+    val topic2AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "lz4"), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+
+    var alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs,
+      topic2Resource -> topic2AlterConfigs
+    ).asJava)
+
+    assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
+    alterResult.all.get
+
+    // Verify that topics were updated correctly
+    var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
+    var configs = describeResult.all.get
+
+    assertEquals(2, configs.size)
+
+    assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value)
+    assertEquals("compact,delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
+    assertEquals((Defaults.LogRetentionHours * 60 * 60 * 1000).toString, configs.get(topic1Resource).get(LogConfig.RetentionMsProp).value)
+
+    assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value)
+
+    //verify subtract operation
+    topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT)
+    ).asJava
+
+   alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs
+    ).asJava)
+    alterResult.all.get
+
+    // Verify that topics were updated correctly
+    describeResult = client.describeConfigs(Seq(topic1Resource).asJava)
+    configs = describeResult.all.get
+
+    assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
+    assertEquals("1000", configs.get(topic1Resource).get(LogConfig.FlushMsProp).value) // verify previous change is still intact
+
+    // Alter topics with validateOnly=true
+    topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.APPEND)
+    ).asJava
+
+    alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+    alterResult.all.get
+
+    // Verify that topics were not updated due to validateOnly = true
+    describeResult = client.describeConfigs(Seq(topic1Resource).asJava)
+    configs = describeResult.all.get
+
+    assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value)
+
+    //Alter topics with validateOnly=true with invalid configs
+    topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET)
+    ).asJava
+
+    alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs
+    ).asJava, new AlterConfigsOptions().validateOnly(true))
+
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+      Some("Invalid config value for resource"))
+  }
+
+  @Test
+  def testInvalidIncrementalAlterConfigs(): Unit = {
+    client = AdminClient.create(createConfig)
+
+    // Create topics
+    val topic1 = "incremental-alter-configs-topic-1"
+    val topic1Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
+    createTopic(topic1)
+
+    val topic2 = "incremental-alter-configs-topic-2"
+    val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
+    createTopic(topic2)
+
+    //Add duplicate Keys for topic1
+    var topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry
+    ).asJavaCollection
+
+    //Add valid config for topic2
+    var topic2AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+
+    var alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs,
+      topic2Resource -> topic2AlterConfigs
+    ).asJava)
+    assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
+
+    //InvalidRequestException error for topic1
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+      Some("Error due to duplicate config keys"))
+
+    //operation should succeed for topic2
+    alterResult.values().get(topic2Resource).get()
+
+    // Verify that topic1 is not config not updated, and topic2 config is updated
+    val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava)
+    val configs = describeResult.all.get
+    assertEquals(2, configs.size)
+
+    assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
+    assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value)
+    assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value)
+
+    //check invalid use of append/subtract operation types
+    topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND)
+    ).asJavaCollection
+
+    topic2AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy"), AlterConfigOp.OpType.SUBTRACT)
+    ).asJavaCollection
+
+    alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs,
+      topic2Resource -> topic2AlterConfigs
+    ).asJava)
+    assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet)
+
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+      Some("Config value append is not allowed for config"))
+
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException],
+      Some("Config value subtract is not allowed for config"))
+
+
+    //try to add invalid config
+    topic1AlterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET)
+    ).asJavaCollection
+
+    alterResult = client.incrementalAlterConfigs(Map(
+      topic1Resource -> topic1AlterConfigs
+    ).asJava)
+    assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
+
+    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException],
+      Some("Invalid config value for resource"))
+  }
 }
 
 object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 9b15389..0bc1045 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
@@ -33,8 +33,9 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{ControlledShutdownRequestData, CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message._
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
@@ -148,8 +149,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
       ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
       ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse],
-      ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse]
-  )
+      ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse],
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> classOf[IncrementalAlterConfigsResponse]
+    )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
@@ -194,7 +196,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
     ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
     ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) =>
-      ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error())
+      ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()),
+    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
+      IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -233,7 +237,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
     ApiKeys.CREATE_PARTITIONS -> topicAlterAcl,
-    ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl
+    ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl,
+    ApiKeys.INCREMENTAL_ALTER_CONFIGS -> topicAlterConfigsAcl
   )
 
   @Before
@@ -392,6 +397,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
           new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
         ))), true).build()
 
+  private def incrementalAlterConfigsRequest = {
+    val data = new IncrementalAlterConfigsRequestData
+    val alterableConfig = new AlterableConfig
+    alterableConfig.setName(LogConfig.MaxMessageBytesProp).
+      setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id())
+    val alterableConfigSet = new AlterableConfigSet
+    alterableConfigSet.add(alterableConfig)
+    data.resources().add(new AlterConfigsResource().
+      setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()).
+      setConfigs(alterableConfigSet))
+    new IncrementalAlterConfigsRequest.Builder(data).build()
+  }
+
   private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
 
   private def createAclsRequest = new CreateAclsRequest.Builder(
@@ -449,7 +467,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
       // Check StopReplica last since some APIs depend on replica availability
       ApiKeys.STOP_REPLICA -> stopReplicaRequest,
-      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest
+      ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest,
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS -> incrementalAlterConfigsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fb7b539..760644e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -39,6 +39,7 @@ import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
@@ -329,7 +330,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
       Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
       StandardCopyOption.REPLACE_EXISTING)
-    TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
     verifySslProduceConsume(sslProperties1, "alter-truststore-4")
     verifySslProduceConsume(sslProperties2, "alter-truststore-5")
   }
@@ -509,7 +510,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     // Enable unclean leader election
     val newProps = new Properties
     newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
-    TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
     waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true")
 
     // Verify that the old follower with missing records is elected as the new leader
@@ -908,7 +909,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val unknownConfig = "some.config"
     props.put(unknownConfig, "some.config.value")
 
-    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
       "Listener config not updated")
@@ -971,11 +972,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       .mkString(",")
 
     val props = fetchBrokerConfigsFromZooKeeper(servers.head)
-    val listenerProps = props.asScala.keySet.filter(_.startsWith(listenerPrefix(listenerName)))
-    listenerProps.foreach(props.remove)
+    val deleteListenerProps = new Properties()
+    deleteListenerProps ++= props.asScala.filter(entry => entry._1.startsWith(listenerPrefix(listenerName)))
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, deleteListenerProps, perBrokerConfig = true, opType = OpType.DELETE).all.get
+
+    props.clear()
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
-    TestUtils.alterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
+    TestUtils.incrementalAlterConfigs(servers, adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 05f4bcd..710ed57 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,9 +22,9 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData}
+import org.apache.kafka.common.message._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.ControlledShutdownRequestData
@@ -402,6 +402,10 @@ class RequestQuotaTest extends BaseRequestTest {
                 .setTimeoutMs(0)
                 .setTopicPartitions(Collections.singletonList(partition)))
 
+        case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
+          new IncrementalAlterConfigsRequest.Builder(
+            new IncrementalAlterConfigsRequestData())
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
@@ -501,6 +505,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs
       case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response).throttleTimeMs
+      case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
+        new IncrementalAlterConfigsResponse(response, ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion()).throttleTimeMs
       case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
     }
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c84d75e..f1a5cca 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-import javax.net.ssl.X509TrustManager
 
+import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log._
@@ -38,7 +38,8 @@ import Implicits._
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk._
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
@@ -1410,6 +1411,20 @@ object TestUtils extends Logging {
     adminClient.alterConfigs(configs)
   }
 
+  def incrementalAlterConfigs(servers: Seq[KafkaServer], adminClient: AdminClient, props: Properties,
+                   perBrokerConfig: Boolean, opType: OpType = OpType.SET): AlterConfigsResult  = {
+    val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new ConfigEntry(k, v), opType) }.toList.asJavaCollection
+    val configs = if (perBrokerConfig) {
+      servers.map { server =>
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+        (resource, configEntries)
+      }.toMap.asJava
+    } else {
+      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> configEntries).asJava
+    }
+    adminClient.incrementalAlterConfigs(configs)
+  }
+
   def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = {
     val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1451,15 +1466,18 @@ object TestUtils extends Logging {
     (out.toString, err.toString)
   }
 
-  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable],
+                                      expectedErrorMessage: Option[String] = None): Unit = {
     try {
       future.get()
       fail("Expected CompletableFuture.get to return an exception")
     } catch {
       case e: ExecutionException =>
-        val cause = e.getCause()
+        val cause = e.getCause
         assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
-            cause.getClass().getName, clazz.isInstance(cause))
+            cause.getClass.getName, clazz.isInstance(cause))
+        expectedErrorMessage.foreach(message => assertTrue(s"Received error message : ${cause.getMessage}" +
+          s" does not contain expected error message : $message", cause.getMessage.contains(message)))
     }
   }
 


Mime
View raw message