kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (#6022)
Date Fri, 01 Feb 2019 14:02:38 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b29487  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (#6022)
4b29487 is described below

commit 4b29487fa9d3d4ff8adaaaa6204db796eccd3a68
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Feb 1 06:02:25 2019 -0800

    KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (#6022)
    
    Limit the number of new connections processed in each iteration of each
    Processor. Block Acceptor if the connection queue is full on all Processors.
    Added a metric to track accept blocked time percent. See KIP-402 for details.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../main/scala/kafka/network/RequestChannel.scala  |   3 +-
 .../main/scala/kafka/network/SocketServer.scala    | 155 ++++++++++++++-------
 core/src/main/scala/kafka/server/KafkaServer.scala |   8 +-
 .../unit/kafka/network/SocketServerTest.scala      |  77 +++++++++-
 4 files changed, 181 insertions(+), 62 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 988c14f..35d64f0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,7 +41,6 @@ object RequestChannel extends Logging {
 
   val RequestQueueSizeMetric = "RequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
-  val ControlPlaneMetricPrefix = "ControlPlane"
   val ProcessorMetricTag = "processor"
 
   def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
@@ -273,7 +272,7 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup
{
+class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup
{
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 125efdc..d8c4a74 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,6 +29,8 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.{BrokerEndPoint, EndPoint}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse,
SendResponse, StartThrottlingResponse}
+import kafka.network.Processor._
+import kafka.network.SocketServer._
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -70,27 +72,24 @@ import scala.util.control.ControlThrowable
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider:
CredentialProvider) extends Logging with KafkaMetricsGroup {
 
-  val DataPlanePrefix = "data-plane"
-  val ControlPlanePrefix = "control-plane"
-
   private val maxQueuedRequests = config.queuedMaxRequests
 
   private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
   this.logIdent = logContext.logPrefix
 
   private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
-  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent",
"socket-server-metrics")
-  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal",
"socket-server-metrics")
+  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent",
MetricsGroup)
+  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal",
MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName,
memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes,
config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests)
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
-  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_
=> new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix))
+  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_
=> new RequestChannel(20, ControlPlaneMetricPrefix))
 
   private var nextProcessorId = 0
   private var connectionQuotas: ConnectionQuotas = _
@@ -119,12 +118,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
       }
     }
 
-    newGauge("NetworkProcessorAvgIdlePercent",
+    newGauge(s"${DataPlaneMetricPrefix}NetworkProcessorAvgIdlePercent",
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
           val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.map { p =>
-            metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
+            metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
           }
           ioWaitRatioMetricNames.map { metricName =>
             Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double],
1.0))
@@ -132,7 +131,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
         }
       }
     )
-    newGauge("ControlPlaneNetworkProcessorAvgIdlePercent",
+    newGauge(s"${ControlPlaneMetricPrefix}NetworkProcessorAvgIdlePercent",
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
@@ -155,7 +154,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
         def value = memoryPool.size() - memoryPool.availableMemory()
       }
     )
-    newGauge("ExpiredConnectionsKilledCount",
+    newGauge(s"${DataPlaneMetricPrefix}ExpiredConnectionsKilledCount",
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
@@ -168,7 +167,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
         }
       }
     )
-    newGauge("ControlPlaneExpiredConnectionsKilledCount",
+    newGauge(s"${ControlPlaneMetricPrefix}ExpiredConnectionsKilledCount",
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
@@ -192,7 +191,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
    * was invoked with `startupProcessors=false`.
    */
   def startDataPlaneProcessors(): Unit = synchronized {
-    dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) }
+    dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlaneThreadPrefix)
}
     info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
   }
 
@@ -202,8 +201,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
    * was invoked with `startupProcessors=false`.
    */
   def startControlPlaneProcessor(): Unit = synchronized {
-    if (controlPlaneAcceptorOpt.isDefined) {
-      controlPlaneAcceptorOpt.get.startProcessors(ControlPlanePrefix)
+    controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
+      controlPlaneAcceptor.startProcessors(ControlPlaneThreadPrefix)
       info(s"Started control-plane processor for the control-plane acceptor")
     }
   }
@@ -213,7 +212,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
   private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                     endpoints: Seq[EndPoint]): Unit = synchronized
{
     endpoints.foreach { endpoint =>
-      val dataPlaneAcceptor = createAcceptor(endpoint)
+      val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
       addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
       KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
dataPlaneAcceptor).start()
       dataPlaneAcceptor.awaitStartup()
@@ -224,7 +223,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
 
   private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit
= synchronized {
     endpointOpt.foreach { endpoint =>
-      val controlPlaneAcceptor = createAcceptor(endpoint)
+      val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
       val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get,
connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
       controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
       controlPlaneProcessorOpt = Some(controlPlaneProcessor)
@@ -232,18 +231,18 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
       listenerProcessors += controlPlaneProcessor
       controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
       nextProcessorId += 1
-      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix)
-      KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
controlPlaneAcceptor).start()
+      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
+      KafkaThread.nonDaemon(s"${ControlPlaneThreadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
controlPlaneAcceptor).start()
       controlPlaneAcceptor.awaitStartup()
       info(s"Created control-plane acceptor and processor for endpoint : $endpoint")
     }
   }
 
-  private def createAcceptor(endPoint: EndPoint) : Acceptor = synchronized {
+  private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = synchronized
{
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
-    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
+    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix)
   }
 
   private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener:
Int): Unit = synchronized {
@@ -257,7 +256,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
       nextProcessorId += 1
     }
     listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
-    acceptor.addProcessors(listenerProcessors, DataPlanePrefix)
+    acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
   }
 
   /**
@@ -368,6 +367,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
 
 }
 
+object SocketServer {
+  val MetricsGroup = "socket-server-metrics"
+  val DataPlaneThreadPrefix = "data-plane"
+  val ControlPlaneThreadPrefix = "control-plane"
+  val DataPlaneMetricPrefix = ""
+  val ControlPlaneMetricPrefix = "ControlPlane"
+}
+
 /**
  * A base class with some helper variables and methods
  */
@@ -437,12 +444,15 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
                               val sendBufferSize: Int,
                               val recvBufferSize: Int,
                               brokerId: Int,
-                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+                              connectionQuotas: ConnectionQuotas,
+                              metricPrefix: String) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
 
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
+  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
+    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
 
   private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix:
String): Unit = synchronized {
     processors ++= newProcessors
@@ -458,7 +468,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String):
Unit = synchronized {
     processors.foreach { processor =>
-      KafkaThread.nonDaemon(processorThreadPrefix + s"-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+      KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
   }
@@ -487,7 +497,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
     startupComplete()
     try {
-      var currentProcessor = 0
+      var currentProcessorIndex = 0
       while (isRunning) {
         try {
           val ready = nioSelector.select(500)
@@ -499,16 +509,26 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
                 val key = iter.next
                 iter.remove()
                 if (key.isAcceptable) {
-                  val processor = synchronized {
-                    currentProcessor = currentProcessor % processors.size
-                    processors(currentProcessor)
+                  accept(key).foreach { socketChannel =>
+
+                    // Assign the channel to the next processor (using round-robin) to which
the
+                    // channel can be added without blocking. If newConnections queue is
full on
+                    // all processors, block until the last one is able to accept a connection.
+                    var retriesLeft = synchronized(processors.length)
+                    var processor: Processor = null
+                    do {
+                      retriesLeft -= 1
+                      processor = synchronized {
+                        // adjust the index (if necessary) and retrieve the processor atomically
for
+                        // correct behaviour in case the number of processors is reduced
dynamically
+                        currentProcessorIndex = currentProcessorIndex % processors.length
+                        processors(currentProcessorIndex)
+                      }
+                      currentProcessorIndex += 1
+                    } while (!assignNewConnection(socketChannel, processor, retriesLeft ==
0))
                   }
-                  accept(key, processor)
                 } else
                   throw new IllegalStateException("Unrecognized key state for acceptor thread.")
-
-                // round robin to the next processor thread, mod(numProcessors) will be done
later
-                currentProcessor = currentProcessor + 1
               } catch {
                 case e: Throwable => error("Error while accepting connection", e)
               }
@@ -547,10 +567,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
     try {
       serverChannel.socket.bind(socketAddress)
-      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort))
+      info(s"Awaiting socket connections on s${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
     } catch {
       case e: SocketException =>
-        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostString,
port, e.getMessage), e)
+        throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port:
${e.getMessage}.", e)
     }
     serverChannel
   }
@@ -558,7 +578,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   /**
    * Accept a new connection
    */
-  def accept(key: SelectionKey, processor: Processor) {
+  private def accept(key: SelectionKey): Option[SocketChannel] = {
     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
     val socketChannel = serverSocketChannel.accept()
     try {
@@ -568,20 +588,26 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
       socketChannel.socket().setKeepAlive(true)
       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
         socketChannel.socket().setSendBufferSize(sendBufferSize)
-
-      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize
[actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
-            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress,
processor.id,
-                  socketChannel.socket.getSendBufferSize, sendBufferSize,
-                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
-
-      processor.accept(socketChannel)
+      Some(socketChannel)
     } catch {
       case e: TooManyConnectionsException =>
-        info("Rejected connection from %s, address already has the configured maximum of
%d connections.".format(e.ip, e.count))
+        info(s"Rejected connection from ${e.ip}, address already has the configured maximum
of ${e.count} connections.")
         close(socketChannel)
+        None
     }
   }
 
+  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock:
Boolean): Boolean = {
+    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
+      debug(s"Accepted connection from ${socketChannel.socket.getRemoteSocketAddress} on"
+
+        s" ${socketChannel.socket.getLocalSocketAddress} and assigned it to processor ${processor.id},"
+
+        s" sendBufferSize [actual|requested]: [${socketChannel.socket.getSendBufferSize}|$sendBufferSize]"
+
+        s" recvBufferSize [actual|requested]: [${socketChannel.socket.getReceiveBufferSize}|$recvBufferSize]")
+      true
+    } else
+      false
+  }
+
   /**
    * Wakeup the thread for selection.
    */
@@ -594,6 +620,8 @@ private[kafka] object Processor {
   val IdlePercentMetricName = "IdlePercent"
   val NetworkProcessorMetricTag = "networkProcessor"
   val ListenerMetricTag = "listener"
+
+  val ConnectionQueueSize = 20
 }
 
 /**
@@ -613,9 +641,9 @@ private[kafka] class Processor(val id: Int,
                                metrics: Metrics,
                                credentialProvider: CredentialProvider,
                                memoryPool: MemoryPool,
-                               logContext: LogContext) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+                               logContext: LogContext,
+                               connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
 
-  import Processor._
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
       case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap
{ case (localHost, localPort) =>
@@ -631,7 +659,7 @@ private[kafka] class Processor(val id: Int,
     override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index"
   }
 
-  private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
+  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
   private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
 
@@ -643,7 +671,7 @@ private[kafka] class Processor(val id: Int,
   newGauge(IdlePercentMetricName,
     new Gauge[Double] {
       def value = {
-        Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics",
metricTags)))
+        Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags)))
           .fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
       }
     },
@@ -798,7 +826,8 @@ private[kafka] class Processor(val id: Int,
   }
 
   private def poll() {
-    try selector.poll(300)
+    val pollTimeout = if (newConnections.isEmpty) 300 else 0
+    try selector.poll(pollTimeout)
     catch {
       case e @ (_: IllegalStateException | _: IOException) =>
         // The exception is not re-thrown and any completed sends/receives/connections/disconnections
@@ -911,20 +940,38 @@ private[kafka] class Processor(val id: Int,
   /**
    * Queue up a new connection for reading
    */
-  def accept(socketChannel: SocketChannel) {
-    newConnections.add(socketChannel)
-    wakeup()
+  def accept(socketChannel: SocketChannel,
+             mayBlock: Boolean,
+             acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
+    val accepted = {
+      if (newConnections.offer(socketChannel))
+        true
+      else if (mayBlock) {
+        val startNs = time.nanoseconds
+        newConnections.put(socketChannel)
+        acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
+        true
+      } else
+        false
+    }
+    if (accepted)
+      wakeup()
+    accepted
   }
 
   /**
-   * Register any new connections that have been queued up
+   * Register any new connections that have been queued up. The number of connections processed
+   * in each iteration is limited to ensure that traffic and connection close notifications
of
+   * existing channels are handled promptly.
    */
   private def configureNewConnections() {
-    while (!newConnections.isEmpty) {
+    var connectionsProcessed = 0
+    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty)
{
       val channel = newConnections.poll()
       try {
         debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
         selector.register(connectionId(channel.socket), channel)
+        connectionsProcessed += 1
       } catch {
         // We explicitly catch all exceptions and close the socket to avoid a socket leak.
         case e: Throwable =>
@@ -1052,4 +1099,4 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String,
Int]) {
 
 }
 
-class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too
many connections from %s (maximum = %d)".format(ip, count))
+class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too
many connections from $ip (maximum = $count)")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 26e1447..8fc5197 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -299,15 +299,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
           fetchManager, brokerTopicStats, clusterId, time, tokenManager)
 
         dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel,
dataPlaneRequestProcessor, time,
-          config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix)
+          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
SocketServer.DataPlaneThreadPrefix)
 
-        config.controlPlaneListener.foreach { _ =>
-          controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get,
replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
+          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager,
adminManager, groupCoordinator, transactionCoordinator,
             kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer,
quotaManagers,
             fetchManager, brokerTopicStats, clusterId, time, tokenManager)
 
           controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get,
controlPlaneRequestProcessor, time,
-            1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix)
+            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
         }
 
         Mx4jLoader.maybeLoad()
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 26db52e..463bbd8 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -26,6 +26,7 @@ import java.util.{HashMap, Properties, Random}
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
 import javax.net.ssl._
+
 import kafka.security.CredentialProvider
 import kafka.server.{KafkaConfig, ThrottledChannel}
 import kafka.utils.Implicits._
@@ -184,6 +185,7 @@ class SocketServerTest extends JUnitSuite {
     sendRequest(plainSocket, serializedBytes)
     processRequest(server.dataPlaneRequestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
+    verifyAcceptorBlockedPercent("PLAINTEXT", expectBlocked = false)
   }
 
   @Test
@@ -1074,6 +1076,58 @@ class SocketServerTest extends JUnitSuite {
     })
   }
 
+  @Test
+  def testConnectionRateLimit(): Unit = {
+    shutdownServerAndMetrics(server)
+    val numConnections = 5
+    props.put("max.connections.per.ip", numConnections.toString)
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props), connectionQueueSize
= 1)
+    testableServer.startup()
+    val testableSelector = testableServer.testableSelector
+    val errors = new mutable.HashSet[String]
+
+    def acceptorStackTraces: scala.collection.Map[Thread, String] = {
+      Thread.getAllStackTraces.asScala.filterKeys(_.getName.contains("kafka-socket-acceptor"))
+        .mapValues(_.toList.mkString("\n"))
+    }
+
+    def acceptorBlocked: Boolean = {
+      val stackTraces = acceptorStackTraces
+      if (stackTraces.isEmpty)
+        errors.add(s"Acceptor thread not found, threads=${Thread.getAllStackTraces.keySet}")
+      stackTraces.exists { case (thread, stackTrace) =>
+          thread.getState == Thread.State.WAITING && stackTrace.contains("ArrayBlockingQueue")
+      }
+    }
+
+    def registeredConnectionCount: Int = testableSelector.operationCounts.getOrElse(SelectorOperation.Register,
0)
+
+    try {
+      // Block selector until Acceptor is blocked while connections are pending
+      testableSelector.pollCallback = () => {
+        try {
+          TestUtils.waitUntilTrue(() => errors.nonEmpty || registeredConnectionCount >=
numConnections - 1 || acceptorBlocked,
+            "Acceptor not blocked", waitTimeMs = 10000)
+        } catch {
+          case _: Throwable => errors.add(s"Acceptor not blocked: $acceptorStackTraces")
+        }
+      }
+      testableSelector.operationCounts.clear()
+      val sockets = (1 to numConnections).map(_ => connect(testableServer))
+      TestUtils.waitUntilTrue(() => errors.nonEmpty || registeredConnectionCount == numConnections,
+        "Connections not registered", waitTimeMs = 15000)
+      assertEquals(Set.empty, errors)
+      testableSelector.waitForOperations(SelectorOperation.Register, numConnections)
+      val pollCount = testableSelector.operationCounts(SelectorOperation.Poll)
+      assertTrue(s"Connections created too quickly: $pollCount", pollCount >= numConnections)
+      verifyAcceptorBlockedPercent("PLAINTEXT", expectBlocked = true)
+
+      assertProcessorHealthy(testableServer, sockets)
+    } finally {
+      shutdownServerAndMetrics(testableServer)
+    }
+  }
+
   private def withTestableServer(config : KafkaConfig = config, testWithServer: TestableSocketServer
=> Unit): Unit = {
     props.put("listeners", "PLAINTEXT://localhost:0")
     val testableServer = new TestableSocketServer(config)
@@ -1116,7 +1170,22 @@ class SocketServerTest extends JUnitSuite {
   def isSocketConnectionId(connectionId: String, socket: Socket): Boolean =
     connectionId.contains(s":${socket.getLocalPort}-")
 
-  class TestableSocketServer(config : KafkaConfig = config) extends SocketServer(config,
+  private def verifyAcceptorBlockedPercent(listenerName: String, expectBlocked: Boolean):
Unit = {
+    val blockedPercentMetricMBeanName = "kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener=PLAINTEXT"
+    val blockedPercentMetrics = YammerMetrics.defaultRegistry.allMetrics.asScala
+      .filterKeys(_.getMBeanName == blockedPercentMetricMBeanName).values
+    assertEquals(1, blockedPercentMetrics.size)
+    val blockedPercentMetric = blockedPercentMetrics.head.asInstanceOf[Meter]
+    val blockedPercent = blockedPercentMetric.meanRate
+    if (expectBlocked) {
+      assertTrue(s"Acceptor blocked percent not recorded: $blockedPercent", blockedPercent
> 0.0)
+      assertTrue(s"Unexpected blocked percent in acceptor: $blockedPercent", blockedPercent
<= 1.0)
+    } else {
+      assertEquals(0.0, blockedPercent, 0.001)
+    }
+  }
+
+  class TestableSocketServer(config : KafkaConfig = config, val connectionQueueSize: Int
= 20) extends SocketServer(config,
       new Metrics, Time.SYSTEM, credentialProvider) {
 
     @volatile var selector: Option[TestableSelector] = None
@@ -1124,7 +1193,9 @@ class SocketServerTest extends JUnitSuite {
     override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas:
ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor
= {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
config.connectionsMaxIdleMs,
-        config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
memoryPool, new LogContext()) {
+        config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
+        memoryPool, new LogContext(), connectionQueueSize) {
+
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector
= {
            val testableSelector = new TestableSelector(config, channelBuilder, time, metrics,
metricTags.asScala)
            selector = Some(testableSelector)
@@ -1206,6 +1277,7 @@ class SocketServerTest extends JUnitSuite {
     val allCachedPollData = Seq(cachedCompletedReceives, cachedCompletedSends, cachedDisconnected)
     @volatile var minWakeupCount = 0
     @volatile var pollTimeoutOverride: Option[Long] = None
+    @volatile var pollCallback: () => Unit = () => {}
 
     def addFailure(operation: SelectorOperation, exception: Option[Exception] = None) {
       failures += operation ->
@@ -1247,6 +1319,7 @@ class SocketServerTest extends JUnitSuite {
 
     override def poll(timeout: Long): Unit = {
       try {
+        pollCallback.apply()
         allCachedPollData.foreach(_.reset)
         runOp(SelectorOperation.Poll, None) {
           super.poll(pollTimeoutOverride.getOrElse(timeout))


Mime
View raw message