kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6751; Support dynamic configuration of max.connections.per.ip/max.connections.per.ip.overrides configs (KIP-308) (#5334)
Date Thu, 09 Aug 2018 21:40:30 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92004fa  KAFKA-6751; Support dynamic configuration of max.connections.per.ip/max.connections.per.ip.overrides
configs (KIP-308) (#5334)
92004fa is described below

commit 92004fa21a9cc75e3790ab39e44d4d3b754d95d9
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Fri Aug 10 03:10:24 2018 +0530

    KAFKA-6751; Support dynamic configuration of max.connections.per.ip/max.connections.per.ip.overrides
configs (KIP-308) (#5334)
    
    KIP-308 implementation. See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=85474993.
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  11 ++
 .../org/apache/kafka/common/utils/UtilsTest.java   |  11 ++
 .../main/scala/kafka/network/SocketServer.scala    |  28 +++-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  25 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   5 +
 .../kafka/network/DynamicConnectionQuotaTest.scala | 143 +++++++++++++++++++++
 .../kafka/server/DynamicBrokerConfigTest.scala     |  16 +++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   2 +
 docs/configuration.html                            |   8 ++
 9 files changed, 242 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6e0b693..a6d3e2c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -68,6 +68,8 @@ public final class Utils {
     // IPv6 is supported with [ip] pattern
     private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
 
+    private static final Pattern VALID_HOST_CHARACTERS = Pattern.compile("([0-9a-zA-Z\\-%._:]*)");
+
     // Prints up to 2 decimal digits. Used for human readable printing
     private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
 
@@ -436,6 +438,15 @@ public final class Utils {
     }
 
     /**
+     * Basic validation of the supplied address. checks for valid characters
+     * @param address hostname string to validate
+     * @return true if address contains valid characters
+     */
+    public static boolean validHostPattern(String address) {
+        return VALID_HOST_CHARACTERS.matcher(address).matches();
+    }
+
+    /**
      * Formats hostname and port number as a "host:port" address string,
      * surrounding IPv6 addresses with braces '[', ']'
      * @param host hostname
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 3feeff2..4d1d830 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -39,6 +39,7 @@ import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.formatBytes;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.validHostPattern;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -60,6 +61,16 @@ public class UtilsTest {
     }
 
     @Test
+    public void testHostPattern() {
+        assertTrue(validHostPattern("127.0.0.1"));
+        assertTrue(validHostPattern("mydomain.com"));
+        assertTrue(validHostPattern("MyDomain.com"));
+        assertTrue(validHostPattern("My_Domain.com"));
+        assertTrue(validHostPattern("::1"));
+        assertTrue(validHostPattern("2001:db8:85a3:8d3:1319:8a2e:370"));
+    }
+
+    @Test
     public void testGetPort() {
         assertEquals(8000, getPort("127.0.0.1:8000").intValue());
         assertEquals(8080, getPort("mydomain.com:8080").intValue());
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 62fc7a5..749c921 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -57,9 +57,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
-  private val maxConnectionsPerIp = config.maxConnectionsPerIp
-  private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
-
   private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
   this.logIdent = logContext.logPrefix
 
@@ -90,7 +87,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time:
Time
    */
   def startup(startupProcessors: Boolean = true) {
     this.synchronized {
-      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
+      connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
       createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
       if (startupProcessors) {
         startProcessors()
@@ -229,6 +226,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
     }
   }
 
+  def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
+    info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
+    connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
+  }
+
+  def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]):
Unit = {
+    info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case
(k, v) => s"$k=$v" }.mkString(",")}")
+    connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+  }
+
   /* `protected` for test usage */
   protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName:
ListenerName,
                                       securityProtocol: SecurityProtocol, memoryPool: MemoryPool):
Processor = {
@@ -878,19 +885,28 @@ private[kafka] class Processor(val id: Int,
 
 class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
 
-  private val overrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host),
count) }
+  @volatile private var defaultMaxConnectionsPerIp = defaultMax
+  @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count)
=> (InetAddress.getByName(host), count) }
   private val counts = mutable.Map[InetAddress, Int]()
 
   def inc(address: InetAddress) {
     counts.synchronized {
       val count = counts.getOrElseUpdate(address, 0)
       counts.put(address, count + 1)
-      val max = overrides.getOrElse(address, defaultMax)
+      val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp)
       if (count >= max)
         throw new TooManyConnectionsException(address, max)
     }
   }
 
+  def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
+    defaultMaxConnectionsPerIp = maxConnectionsPerIp
+  }
+
+  def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
+    maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host),
count) }
+  }
+
   def dec(address: InetAddress) {
     counts.synchronized {
       val count = counts.getOrElse(address,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 72772fa..19743e5 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -80,7 +80,8 @@ object DynamicBrokerConfig {
     DynamicLogConfig.ReconfigurableConfigs ++
     DynamicThreadPool.ReconfigurableConfigs ++
     Set(KafkaConfig.MetricReporterClassesProp) ++
-    DynamicListenerConfig.ReconfigurableConfigs
+    DynamicListenerConfig.ReconfigurableConfigs ++
+    DynamicConnectionQuota.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs
@@ -197,6 +198,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
+    addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock)
{
@@ -815,3 +817,24 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable
wi
 
 }
 
+object DynamicConnectionQuota {
+  val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp)
+}
+
+class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicConnectionQuota.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides)
+
+    if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp)
+      server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp)
+  }
+}
+
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3d7367e..b651549 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -1392,5 +1393,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     if (maxConnectionsPerIp == 0)
       require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp}
can be set to zero only if" +
         s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
+
+    val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
+    if (!invalidAddresses.isEmpty)
+      throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp}
contains invalid addresses : ${invalidAddresses.mkString(",")}")
   }
 }
diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
new file mode 100644
index 0000000..374556b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.io.IOException
+import java.net.{InetAddress, Socket}
+import java.util.Properties
+
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class DynamicConnectionQuotaTest extends BaseRequestTest {
+
+  override def numBrokers = 1
+
+  val topic = "test"
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
+  }
+
+  @Test
+  def testDynamicConnectionQuota(): Unit = {
+    def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
localAddr: InetAddress = null) = {
+      new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)),
localAddr, 0)
+    }
+
+    val socketServer = servers.head.socketServer
+    val localAddress = InetAddress.getByName("127.0.0.1")
+    def connectionCount = socketServer.connectionCount(localAddress)
+    val initialConnectionCount = connectionCount
+    val maxConnectionsPerIP = 5
+
+    val props = new Properties
+    props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp,
maxConnectionsPerIP.toString))
+
+    //wait for adminClient connections to close
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection
count mismatch")
+
+    //create connections up to maxConnectionsPerIP - 1, leave space for one connection
+    var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer))
+
+    // produce should succeed
+    var produceResponse = sendProduceRequest()
+    assertEquals(1, produceResponse.responses.size)
+    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+    assertEquals(Errors.NONE, partitionResponse.error)
+
+    conns = conns :+ connect(socketServer)
+    // now try one more (should fail)
+    intercept[IOException](sendProduceRequest())
+
+    conns.foreach(conn => conn.close())
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection
count mismatch")
+
+    // Increase MaxConnectionsPerIpOverrides for localhost to 7
+    val maxConnectionsPerIPOverride = 7
+    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp,
s"localhost:$maxConnectionsPerIPOverride"))
+
+    //wait for adminClient connections to close
+    TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection
count mismatch")
+
+    //create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection
+    conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer))
+
+    // send should succeed
+    produceResponse = sendProduceRequest()
+    assertEquals(1, produceResponse.responses.size)
+    val (tp1, partitionResponse1) = produceResponse.responses.asScala.head
+    assertEquals(Errors.NONE, partitionResponse1.error)
+
+    conns = conns :+ connect(socketServer)
+    // now try one more (should fail)
+    intercept[IOException](sendProduceRequest())
+
+    //close one connection
+    conns.head.close()
+    // send should succeed
+    sendProduceRequest()
+  }
+
+  private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify:
(String, String)): Unit = {
+    val adminClient = createAdminClient()
+    TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get()
+    waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2)
+    adminClient.close()
+  }
+
+  private def createAdminClient(): AdminClient = {
+    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(securityProtocol.name))
+    val config = new Properties()
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+    config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
+    val adminClient = AdminClient.create(config)
+    adminClient
+  }
+
+  private def waitForConfigOnServer(propName: String, propValue: String, maxWaitMs: Long
= 10000): Unit = {
+    TestUtils.retry(maxWaitMs) {
+      assertEquals(propValue, servers.head.config.originals.get(propName))
+    }
+  }
+
+  private def sendProduceRequest(): ProduceResponse = {
+    val topicPartition = new TopicPartition(topic, 0)
+    val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(),
"key".getBytes, "value".getBytes))
+    val partitionRecords = Map(topicPartition -> memoryRecords)
+    val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
+    val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer)
+    ProduceResponse.parse(response, request.version)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 9c8acb4..41b9055 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -171,6 +171,22 @@ class DynamicBrokerConfigTest extends JUnitSuite {
     verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
   }
 
+  @Test
+  def testConnectionQuota(): Unit = {
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = true,
expectFailure = false)
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = false,
expectFailure = false)
+    //MaxConnectionsPerIpProp can be set to zero only if MaxConnectionsPerIpOverridesProp
property is set
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "0", perBrokerConfig = false,
expectFailure = true)
+
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0",
perBrokerConfig = true,
+      expectFailure = false)
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0",
perBrokerConfig = false,
+      expectFailure = false)
+    //test invalid address
+    verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig
= true,
+      expectFailure = true)
+  }
+
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure:
Boolean) {
     val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 0ee8d81..927dd1c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -792,6 +792,8 @@ class KafkaConfigTest {
     assertFalse(isValidKafkaConfig(props))
     props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
     assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
+    assertFalse(isValidKafkaConfig(props))
   }
 
   private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values:
Any*) {
diff --git a/docs/configuration.html b/docs/configuration.html
index cc3b42c..e5576f9 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -203,6 +203,14 @@
     <li><code>background.threads</code></li>
   </ul>
 
+  <h5>Updating ConnectionQuota Configs</h5>
+  The maximum number of connections allowed for a given IP/host by the broker may be updated
dynamically at cluster-default level used by all brokers.
+  The changes will apply for new connection creations and the existing connections count
will be taken into account by the new limits.
+  <ul>
+    <li><code>max.connections.per.ip</code></li>
+    <li><code>max.connections.per.ip.overrides</code></li>
+  </ul>
+
   <h5>Adding and Removing Listeners</h5>
   <p>Listeners may be added or removed dynamically. When a new listener is added, security
configs of the listener must be provided
   as listener configs with the listener prefix <code>listener.name.{listenerName}.</code>.
If the new listener uses SASL,


Mime
View raw message