kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] 01/02: KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
Date Tue, 05 May 2020 21:19:15 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 177abc77ee29721164b8633a62d7b259eca4c7b3
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Mar 13 18:24:03 2020 +0000

    KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    (cherry picked from commit f165cdc325388883541db381c4bdfd30da089b3b)
---
 core/src/main/scala/kafka/log/LogConfig.scala      |   4 +
 .../main/scala/kafka/network/RequestChannel.scala  |  54 +++++-
 .../src/main/scala/kafka/server/AdminManager.scala |  21 +--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   6 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  14 ++
 .../unit/kafka/network/RequestChannelTest.scala    | 195 +++++++++++++++++++++
 6 files changed, 277 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4f26716..59fe81c 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -299,6 +299,10 @@ object LogConfig {
 
   def serverConfigName(configName: String): Option[String] = configDef.serverConfigName(configName)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    Option(configDef.configKeys.get(configName)).map(_.`type`)
+  }
+
   /**
    * Create a log config instance using the given properties and defaults
    */
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 531cac1..c9a7d03 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,9 +23,15 @@ import java.util.concurrent._
 
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
+import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
 import kafka.utils.{Logging, NotNothing, Pool}
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
@@ -98,7 +104,7 @@ object RequestChannel extends Logging {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}"
+    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
 
     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]):
T = {
       bodyAndSize.request match {
@@ -108,6 +114,52 @@ object RequestChannel extends Logging {
       }
     }
 
+    def loggableRequest: AbstractRequest = {
+
+      def loggableValue(resourceType: ConfigResource.Type, name: String, value: String):
String = {
+        val maybeSensitive = resourceType match {
+          case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
+          case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name))
+          case ConfigResource.Type.BROKER_LOGGER => false
+          case _ => true
+        }
+        if (maybeSensitive) Password.HIDDEN else value
+      }
+
+      bodyAndSize.request match {
+        case alterConfigs: AlterConfigsRequest =>
+          val loggableConfigs = alterConfigs.configs().asScala.map { case (resource, config)
=>
+            val loggableEntries = new AlterConfigsRequest.Config(config.entries.asScala.map
{ entry =>
+                new AlterConfigsRequest.ConfigEntry(entry.name, loggableValue(resource.`type`,
entry.name, entry.value))
+            }.asJavaCollection)
+            (resource, loggableEntries)
+          }.asJava
+          new AlterConfigsRequest.Builder(loggableConfigs, alterConfigs.validateOnly).build(alterConfigs.version())
+
+        case alterConfigs: IncrementalAlterConfigsRequest =>
+          val resources = new AlterConfigsResourceCollection(alterConfigs.data.resources.size)
+          alterConfigs.data().resources().asScala.foreach { resource =>
+            val newResource = new AlterConfigsResource()
+              .setResourceName(resource.resourceName)
+              .setResourceType(resource.resourceType)
+            resource.configs.asScala.foreach { config =>
+              newResource.configs.add(new AlterableConfig()
+                .setName(config.name)
+                .setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType),
config.name, config.value))
+                .setConfigOperation(config.configOperation))
+            }
+            resources.add(newResource)
+          }
+          val data = new IncrementalAlterConfigsRequestData()
+            .setValidateOnly(alterConfigs.data().validateOnly())
+            .setResources(resources)
+          new IncrementalAlterConfigsRequest.Builder(data).build(alterConfigs.version)
+
+        case _ =>
+          bodyAndSize.request
+      }
+    }
+
     trace(s"Processor $processor received request: ${requestDesc(true)}")
 
     def requestThreadTimeNanos: Long = {
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 43f6e46..72b8d98 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -636,14 +636,6 @@ class AdminManager(val config: KafkaConfig,
     DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
   }
 
-  private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
-    val configType = config.typeOf(name)
-    if (configType != null)
-      configType
-    else
-      synonyms.iterator.map(config.typeOf).find(_ != null).orNull
-  }
-
   private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean):
List[DescribeConfigsResponse.ConfigSynonym] = {
     val dynamicConfig = config.dynamicConfig
     val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
@@ -664,9 +656,9 @@ class AdminManager(val config: KafkaConfig,
 
   private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms:
Boolean)
                                     (name: String, value: Any): DescribeConfigsResponse.ConfigEntry
= {
-    val configEntryType = logConfig.typeOf(name)
-    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
-    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+    val configEntryType = LogConfig.configType(name)
+    val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
+    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
     val allSynonyms = {
       val list = LogConfig.TopicConfigSynonyms.get(name)
         .map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
@@ -684,14 +676,13 @@ class AdminManager(val config: KafkaConfig,
   private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry
= {
     val allNames = brokerSynonyms(name)
-    val configEntryType = configType(name, allNames)
-    // If we can't determine the config entry type, treat it as a sensitive config to be
safe
-    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
+    val configEntryType = KafkaConfig.configType(name)
+    val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
     val valueAsString = if (isSensitive)
       null
     else value match {
       case v: String => v
-      case _ => ConfigDef.convertToString(value, configEntryType)
+      case _ => ConfigDef.convertToString(value, configEntryType.orNull)
     }
     val allSynonyms = configSynonyms(name, allNames, isSensitive)
         .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7508184..92aa048 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,7 +87,11 @@ object DynamicBrokerConfig {
   private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
-  private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
+  private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp,
+    KafkaConfig.SaslLoginCallbackHandlerClassProp,
+    KafkaConfig.SaslLoginClassProp,
+    KafkaConfig.SaslServerCallbackHandlerClassProp,
+    KafkaConfig.ConnectionsMaxReauthMsProp)
 
   private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8650041..dc5c3c7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,6 +1229,20 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+
+    typeOf(configName) match {
+      case Some(t) => Some(t)
+      case None =>
+        DynamicBrokerConfig.brokerConfigSynonyms(configName, matchListenerOverride = true).flatMap(typeOf).headOption
+    }
+  }
+
+  def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
+    // If we can't determine the config entry type, treat it as a sensitive config to be
safe
+    configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
+  }
 }
 
 class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride:
Option[DynamicBrokerConfig])
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
new file mode 100644
index 0000000..7f570e6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Collections
+
+import kafka.network
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.AlterConfigsRequest._
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.easymock.EasyMock._
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class RequestChannelTest {
+
+  @Test
+  def testAlterRequests(): Unit = {
+
+    val sensitiveValue = "secret"
+    def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues:
Map[String, String]): Unit = {
+      val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.singletonMap(resource,
+        new Config(entries.asJavaCollection)), true).build())
+      val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+      val loggedConfig = loggableAlterConfigs.configs.get(resource)
+      assertEquals(expectedValues, toMap(loggedConfig))
+      val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+      assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+    }
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val keystorePassword = new ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
+    verifyConfig(brokerResource, Seq(keystorePassword), Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
-> Password.HIDDEN))
+
+    val keystoreLocation = new ConfigEntry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore")
+    verifyConfig(brokerResource, Seq(keystoreLocation), Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
-> "/path/to/keystore"))
+    verifyConfig(brokerResource, Seq(keystoreLocation, keystorePassword),
+      Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
-> Password.HIDDEN))
+
+    val listenerKeyPassword = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}",
sensitiveValue)
+    verifyConfig(brokerResource, Seq(listenerKeyPassword), Map(listenerKeyPassword.name ->
Password.HIDDEN))
+
+    val listenerKeystore = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}",
"/path/to/keystore")
+    verifyConfig(brokerResource, Seq(listenerKeystore), Map(listenerKeystore.name -> "/path/to/keystore"))
+
+    val plainJaasConfig = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}",
sensitiveValue)
+    verifyConfig(brokerResource, Seq(plainJaasConfig), Map(plainJaasConfig.name -> Password.HIDDEN))
+
+    val plainLoginCallback = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}",
"test.LoginClass")
+    verifyConfig(brokerResource, Seq(plainLoginCallback), Map(plainLoginCallback.name ->
plainLoginCallback.value))
+
+    val customConfig = new ConfigEntry("custom.config", sensitiveValue)
+    verifyConfig(brokerResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+    val compressionType = new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    verifyConfig(topicResource, Seq(compressionType), Map(TopicConfig.COMPRESSION_TYPE_CONFIG
-> "lz4"))
+    verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+    // Verify empty request
+    val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.emptyMap[ConfigResource,
Config], true).build())
+    assertEquals(Collections.emptyMap, alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
+  }
+
+  @Test
+  def testIncrementalAlterRequests(): Unit = {
+
+    def incrementalAlterConfigs(resource: ConfigResource,
+                                entries: Map[String, String], op: OpType): IncrementalAlterConfigsRequest
= {
+      val data = new IncrementalAlterConfigsRequestData()
+      val alterableConfigs = new AlterableConfigCollection()
+      entries.foreach { case (name, value) =>
+        alterableConfigs.add(new AlterableConfig().setName(name).setValue(value).setConfigOperation(op.id))
+      }
+      data.resources.add(new AlterConfigsResource()
+        .setResourceName(resource.name).setResourceType(resource.`type`.id)
+        .setConfigs(alterableConfigs))
+      new IncrementalAlterConfigsRequest.Builder(data).build()
+    }
+
+    val sensitiveValue = "secret"
+    def verifyConfig(resource: ConfigResource,
+                     op: OpType,
+                     entries: Map[String, String],
+                     expectedValues: Map[String, String]): Unit = {
+      val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
+      val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
+      val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
+      assertEquals(expectedValues, toMap(loggedConfig))
+      val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+      assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+    }
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val keystorePassword = Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, keystorePassword, Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
-> Password.HIDDEN))
+
+    val keystoreLocation = Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore")
+    verifyConfig(brokerResource, OpType.SET, keystoreLocation, keystoreLocation)
+    verifyConfig(brokerResource, OpType.SET, keystoreLocation ++ keystorePassword,
+      Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
-> Password.HIDDEN))
+
+    val listenerKeyPassword = Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}"
-> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, listenerKeyPassword,
+      Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}" -> Password.HIDDEN))
+
+    val listenerKeystore = Map(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}"
-> "/path/to/keystore")
+    verifyConfig(brokerResource, OpType.SET, listenerKeystore, listenerKeystore)
+
+    val plainJaasConfig = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}"
-> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, plainJaasConfig,
+      Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}" -> Password.HIDDEN))
+
+    val plainLoginCallback = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}"
-> "test.LoginClass")
+    verifyConfig(brokerResource, OpType.SET, plainLoginCallback, plainLoginCallback)
+
+    val sslProtocols = Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1")
+    verifyConfig(brokerResource, OpType.APPEND, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
-> "TLSv1.1"))
+    verifyConfig(brokerResource, OpType.SUBTRACT, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
-> "TLSv1.1"))
+    val cipherSuites = Map(SslConfigs.SSL_CIPHER_SUITES_CONFIG -> null)
+    verifyConfig(brokerResource, OpType.DELETE, cipherSuites, cipherSuites)
+
+    val customConfig = Map("custom.config" -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+    val compressionType = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4")
+    verifyConfig(topicResource, OpType.SET, compressionType, compressionType)
+    verifyConfig(topicResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+  }
+
+  @Test
+  def testNonAlterRequestsNotTransformed(): Unit = {
+    val metadataRequest = request(new MetadataRequest.Builder(List("topic").asJava, true).build())
+    assertSame(metadataRequest.body[MetadataRequest], metadataRequest.loggableRequest)
+  }
+
+  def request(req: AbstractRequest): RequestChannel.Request = {
+    val buffer = req.serialize(new RequestHeader(req.api, req.version, "client-id", 1))
+    val requestContext = newRequestContext(buffer)
+    new network.RequestChannel.Request(processor = 1,
+      requestContext,
+      startTimeNanos = 0,
+      createNiceMock(classOf[MemoryPool]),
+      buffer,
+      createNiceMock(classOf[RequestChannel.Metrics])
+    )
+  }
+
+  private def newRequestContext(buffer: ByteBuffer): RequestContext = {
+    new RequestContext(
+      RequestHeader.parse(buffer),
+      "connection-id",
+      InetAddress.getLoopbackAddress,
+      new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
+      ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT,
+      new ClientInformation("name", "version"))
+  }
+
+  private def toMap(config: Config): Map[String, String] = {
+    config.entries.asScala.map(e => e.name -> e.value).toMap
+  }
+
+  private def toMap(config: IncrementalAlterConfigsRequestData.AlterableConfigCollection):
Map[String, String] = {
+    config.asScala.map(e => e.name -> e.value).toMap
+  }
+}
\ No newline at end of file


Mime
View raw message