kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [kafka] Diff for: [GitHub] junrao closed pull request #5921: KAFKA-4453 : Added code to separate controller connections and requests from the data plane
Date Sun, 13 Jan 2019 18:17:55 GMT
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b5c6a910b0a..083e9525822 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
-    val brokerNode = broker.node(config.interBrokerListenerName)
+    val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+    val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+    val brokerNode = broker.node(controllerToBrokerListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
-        config.interBrokerSecurityProtocol,
+        controllerToBrokerSecurityProtocol,
         JaasContext.Type.SERVER,
         config,
-        config.interBrokerListenerName,
+        controllerToBrokerListenerName,
         config.saslMechanismInterBrokerProtocol,
         time,
         config.saslInterBrokerHandshakeRequestEnable
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b09688c5b..988c14f263e 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,6 +41,7 @@ object RequestChannel extends Logging {
 
   val RequestQueueSizeMetric = "RequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
+  val ControlPlaneMetricPrefix = "ControlPlane"
   val ProcessorMetricTag = "processor"
 
   def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
@@ -272,17 +273,19 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup {
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
+  val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
+  val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
 
-  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+  newGauge(requestQueueSizeMetricName, new Gauge[Int] {
       def value = requestQueue.size
   })
 
-  newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
+  newGauge(responseQueueSizeMetricName, new Gauge[Int]{
     def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -292,7 +295,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
 
-    newGauge(ResponseQueueSizeMetric,
+    newGauge(responseQueueSizeMetricName,
       new Gauge[Int] {
         def value = processor.responseQueueSize
       },
@@ -302,7 +305,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
-    removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
+    removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ae09a03b7dd..125efdc373d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -51,13 +51,28 @@ import scala.collection.mutable.{ArrayBuffer, Buffer}
 import scala.util.control.ControlThrowable
 
 /**
- * An NIO socket server. The threading model is
- *   1 Acceptor thread that handles new connections
- *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
- *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ * Handles new connections, requests and responses to and from broker.
+ * Kafka supports two types of request planes :
+ *  - data-plane :
+ *    - Handles requests from clients and other brokers in the cluster.
+ *    - The threading model is
+ *      1 Acceptor thread per listener, that handles new connections.
+ *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
+ *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
+ *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ *  - control-plane :
+ *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
+ *      If not configured, the controller requests are handled by the data-plane.
+ *    - The threading model is
+ *      1 Acceptor thread that handles new connections
+ *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
+ *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
 
+  val DataPlanePrefix = "data-plane"
+  val ControlPlanePrefix = "control-plane"
+
   private val maxQueuedRequests = config.queuedMaxRequests
 
   private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
@@ -68,11 +83,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  val requestChannel = new RequestChannel(maxQueuedRequests)
-  private val processors = new ConcurrentHashMap[Int, Processor]()
-  private var nextProcessorId = 0
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests)
+  // control-plane
+  private var controlPlaneProcessorOpt : Option[Processor] = None
+  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix))
 
-  private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  private var nextProcessorId = 0
   private var connectionQuotas: ConnectionQuotas = _
   private var stoppedProcessingRequests = false
 
@@ -91,9 +111,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def startup(startupProcessors: Boolean = true) {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
-      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+      createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
+      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
       if (startupProcessors) {
-        startProcessors()
+        startControlPlaneProcessor()
+        startDataPlaneProcessors()
       }
     }
 
@@ -101,12 +123,25 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
-          val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
+          val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.map { p =>
             metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
           }
           ioWaitRatioMetricNames.map { metricName =>
             Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-          }.sum / processors.size
+          }.sum / dataPlaneProcessors.size
+        }
+      }
+    )
+    newGauge("ControlPlaneNetworkProcessorAvgIdlePercent",
+      new Gauge[Double] {
+
+        def value = SocketServer.this.synchronized {
+          val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
+            metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
+          }
+          ioWaitRatioMetricName.map { metricName =>
+            Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
+          }.getOrElse(Double.NaN)
         }
       }
     )
@@ -124,7 +159,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
-          val expiredConnectionsKilledCountMetricNames = processors.values.asScala.map { p =>
+          val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.map { p =>
             metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
           }
           expiredConnectionsKilledCountMetricNames.map { metricName =>
@@ -133,96 +168,148 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         }
       }
     )
-    info(s"Started ${acceptors.size} acceptor threads")
+    newGauge("ControlPlaneExpiredConnectionsKilledCount",
+      new Gauge[Double] {
+
+        def value = SocketServer.this.synchronized {
+          val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
+            metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
+          }
+          expiredConnectionsKilledCountMetricNames.map { metricName =>
+            Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
+          }.getOrElse(0.0)
+        }
+      }
+    )
+    info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane")
+    if (controlPlaneAcceptorOpt.isDefined)
+      info("Started control-plane acceptor thread")
   }
 
   /**
-   * Starts processors of all the acceptors of this server if they have not already been started.
-   * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]]
+   * Starts processors of all the data-plane acceptors of this server if they have not already been started.
+   * This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]]
    * was invoked with `startupProcessors=false`.
    */
-  def startProcessors(): Unit = synchronized {
-    acceptors.values.asScala.foreach { _.startProcessors() }
-    info(s"Started processors for ${acceptors.size} acceptors")
+  def startDataPlaneProcessors(): Unit = synchronized {
+    dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) }
+    info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
+  }
+
+  /**
+   * Start the processor of control-plane acceptor of this server if it has not already been started.
+   * This method is used for delayed starting of control-plane processor if [[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startControlPlaneProcessor(): Unit = synchronized {
+    if (controlPlaneAcceptorOpt.isDefined) {
+      controlPlaneAcceptorOpt.get.startProcessors(ControlPlanePrefix)
+      info(s"Started control-plane processor for the control-plane acceptor")
+    }
   }
 
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
-  private def createAcceptorAndProcessors(processorsPerListener: Int,
-                                          endpoints: Seq[EndPoint]): Unit = synchronized {
+  private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
+                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
+    endpoints.foreach { endpoint =>
+      val dataPlaneAcceptor = createAcceptor(endpoint)
+      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
+      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
+      dataPlaneAcceptor.awaitStartup()
+      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
+      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
+    }
+  }
+
+  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
+    endpointOpt.foreach { endpoint =>
+      val controlPlaneAcceptor = createAcceptor(endpoint)
+      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
+      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
+      controlPlaneProcessorOpt = Some(controlPlaneProcessor)
+      val listenerProcessors = new ArrayBuffer[Processor]()
+      listenerProcessors += controlPlaneProcessor
+      controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
+      nextProcessorId += 1
+      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix)
+      KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start()
+      controlPlaneAcceptor.awaitStartup()
+      info(s"Created control-plane acceptor and processor for endpoint : $endpoint")
+    }
+  }
 
+  private def createAcceptor(endPoint: EndPoint) : Acceptor = synchronized {
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
-
-    endpoints.foreach { endpoint =>
-      val listenerName = endpoint.listenerName
-      val securityProtocol = endpoint.securityProtocol
-
-      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
-      addProcessors(acceptor, endpoint, processorsPerListener)
-      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
-      acceptor.awaitStartup()
-      acceptors.put(endpoint, acceptor)
-    }
+    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
   }
 
-  private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+  private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
     val listenerName = endpoint.listenerName
     val securityProtocol = endpoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
-
     for (_ <- 0 until newProcessorsPerListener) {
-      val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
       listenerProcessors += processor
-      requestChannel.addProcessor(processor)
+      dataPlaneRequestChannel.addProcessor(processor)
       nextProcessorId += 1
     }
-    listenerProcessors.foreach(p => processors.put(p.id, p))
-    acceptor.addProcessors(listenerProcessors)
+    listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
+    acceptor.addProcessors(listenerProcessors, DataPlanePrefix)
   }
 
   /**
-    * Stop processing requests and new connections.
-    */
+   * Stop processing requests and new connections.
+   */
   def stopProcessingRequests() = {
     info("Stopping socket server request processors")
     this.synchronized {
-      acceptors.asScala.values.foreach(_.shutdown())
-      processors.asScala.values.foreach(_.shutdown())
-      requestChannel.clear()
+      dataPlaneAcceptors.asScala.values.foreach(_.shutdown())
+      controlPlaneAcceptorOpt.foreach(_.shutdown())
+      dataPlaneProcessors.asScala.values.foreach(_.shutdown())
+      controlPlaneProcessorOpt.foreach(_.shutdown())
+      dataPlaneRequestChannel.clear()
+      controlPlaneRequestChannelOpt.foreach(_.clear())
       stoppedProcessingRequests = true
     }
     info("Stopped socket server request processors")
   }
 
   def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
-    info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
+    info(s"Resizing network thread pool size for each data-plane listener from $oldNumNetworkThreads to $newNumNetworkThreads")
     if (newNumNetworkThreads > oldNumNetworkThreads) {
-      acceptors.asScala.foreach { case (endpoint, acceptor) =>
-        addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
+      dataPlaneAcceptors.asScala.foreach { case (endpoint, acceptor) =>
+        addDataPlaneProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
       }
     } else if (newNumNetworkThreads < oldNumNetworkThreads)
-      acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
+      dataPlaneAcceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, dataPlaneRequestChannel))
   }
 
   /**
-    * Shutdown the socket server. If still processing requests, shutdown
-    * acceptors and processors first.
-    */
+   * Shutdown the socket server. If still processing requests, shutdown
+   * acceptors and processors first.
+   */
   def shutdown() = {
     info("Shutting down socket server")
     this.synchronized {
       if (!stoppedProcessingRequests)
         stopProcessingRequests()
-      requestChannel.shutdown()
+      dataPlaneRequestChannel.shutdown()
+      controlPlaneRequestChannelOpt.foreach(_.shutdown())
     }
     info("Shutdown completed")
   }
 
   def boundPort(listenerName: ListenerName): Int = {
     try {
-      acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
+      val acceptor = dataPlaneAcceptors.get(endpoints(listenerName))
+      if (acceptor != null) {
+        acceptor.serverChannel.socket.getLocalPort
+      } else {
+        controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
+      }
     } catch {
       case e: Exception =>
         throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
@@ -230,15 +317,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   }
 
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
-    info(s"Adding listeners for endpoints $listenersAdded")
-    createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
-    startProcessors()
+    info(s"Adding data-plane listeners for endpoints $listenersAdded")
+    createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, listenersAdded)
+    startDataPlaneProcessors()
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
-    info(s"Removing listeners for endpoints $listenersRemoved")
+    info(s"Removing data-plane listeners for endpoints $listenersRemoved")
     listenersRemoved.foreach { endpoint =>
-      acceptors.asScala.remove(endpoint).foreach(_.shutdown())
+      dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
     }
   }
 
@@ -252,8 +339,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
   }
 
-  /* `protected` for test usage */
-  protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+  // `protected` for test usage
+  protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                       securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
     new Processor(id,
       time,
@@ -272,12 +359,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     )
   }
 
-  /* For test usage */
+  // For test usage
   private[network] def connectionCount(address: InetAddress): Int =
     Option(connectionQuotas).fold(0)(_.get(address))
 
-  /* For test usage */
-  private[network] def processor(index: Int): Processor = processors.get(index)
+  // For test usage
+  private[network] def dataPlaneProcessor(index: Int): Processor = dataPlaneProcessors.get(index)
 
 }
 
@@ -357,21 +444,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
 
-  private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized {
+  private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors ++= newProcessors
     if (processorsStarted.get)
-      startProcessors(newProcessors)
+      startProcessors(newProcessors, processorThreadPrefix)
   }
 
-  private[network] def startProcessors(): Unit = synchronized {
+  private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
     if (!processorsStarted.getAndSet(true)) {
-      startProcessors(processors)
+      startProcessors(processors, processorThreadPrefix)
     }
   }
 
-  private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
+  private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors.foreach { processor =>
-      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+      KafkaThread.nonDaemon(processorThreadPrefix + s"-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
   }
@@ -444,9 +531,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     }
   }
 
-  /*
-   * Create a server socket to listen for connections on.
-   */
+  /**
+  * Create a server socket to listen for connections on.
+  */
   private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
       if (host == null || host.trim.isEmpty)
@@ -468,7 +555,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel
   }
 
-  /*
+  /**
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
@@ -564,7 +651,7 @@ private[kafka] class Processor(val id: Int,
     // also includes the listener name)
     Map(NetworkProcessorMetricTag -> id.toString)
   )
-  
+
   val expiredConnectionsKilledCount = new Total()
   private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
   metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
@@ -688,7 +775,7 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
-  /* `protected` for test usage */
+  // `protected` for test usage
   protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
     val connectionId = response.request.context.connectionId
     trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c0f6c1b52c..1c5657263c8 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     if (newConfig.numIoThreads != oldConfig.numIoThreads)
-      server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+      server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
     if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
       server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
     if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 13f555a2059..c3c3295835c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -286,6 +286,7 @@ object KafkaConfig {
   val AdvertisedPortProp = "advertised.port"
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
+  val ControlPlaneListenerNameProp = "control.plane.listener.name"
   val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
   val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -503,7 +504,7 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
   val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
-  val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+  val QueuedMaxRequestsDoc = "The number of queued requests allowed for data-plane, before blocking the network threads"
   val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
@@ -546,6 +547,22 @@ object KafkaConfig {
     "prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
     "INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " +
     "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). "
+  val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
+    s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
+    "For example, if a broker's config is :\n" +
+    "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" +
+    "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+    "control.plane.listener.name = CONTROLLER\n" +
+    "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" +
+    s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " +
+    "to find the endpoint, which it will use to establish connection to the broker.\n" +
+    "For example, if the broker's published endpoints on zookeeper are :\n" +
+    "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" +
+    " and the controller's config is :\n" +
+    "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+    "control.plane.listener.name = CONTROLLER\n" +
+    "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" +
+    "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections."
 
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
@@ -846,6 +863,7 @@ object KafkaConfig {
       .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
       .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
       .define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
+      .define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -1265,6 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
   def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
+  def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName }
+  def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol }
   def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
   val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
 
@@ -1337,6 +1357,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
   }
 
+  def controlPlaneListener: Option[EndPoint] = {
+    controlPlaneListenerName.map { listenerName =>
+      listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
+    }
+  }
+
+  def dataPlaneListeners: Seq[EndPoint] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+      case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
+      case None => listeners
+    }
+  }
+
   // If the user defined advertised listeners, we use those
   // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
@@ -1368,6 +1401,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+      case Some(name) =>
+        val listenerName = ListenerName.normalised(name)
+        val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+          throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
+            s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
+        Some(listenerName, securityProtocol)
+
+      case None => None
+   }
+  }
+
   private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
     try SecurityProtocol.forName(protocolName)
     catch {
@@ -1419,6 +1465,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
 
+    // validate controller.listener.name config
+    if (controlPlaneListenerName.isDefined) {
+      require(advertisedListenerNames.contains(controlPlaneListenerName.get),
+        s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+        s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
+      // controlPlaneListenerName should be different from interBrokerListenerName
+      require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
+        s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " +
+        s"Currently they both have the value ${controlPlaneListenerName.get}")
+    }
+
     val recordVersion = logMessageFormatVersion.recordVersion
     require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
       s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
@@ -1443,7 +1500,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     if (connectionsMaxIdleMs >= 0)
       require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
         s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
-          s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
-          " authentication responses from timing out")
+        s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
+        s" authentication responses from timing out")
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0d41219663..e0ad1b60cac 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -96,13 +96,15 @@ class KafkaRequestHandlerPool(val brokerId: Int,
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
                               time: Time,
-                              numThreads: Int) extends Logging with KafkaMetricsGroup {
+                              numThreads: Int,
+                              requestHandlerAvgIdleMetricName: String,
+                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
 
   private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
   /* a meter to track the average free capacity of the request handlers */
-  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
 
-  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
+  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
   val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
   for (i <- 0 until numThreads) {
     createHandler(i)
@@ -110,7 +112,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
 
   def createHandler(id: Int): Unit = synchronized {
     runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
-    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
+    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
   }
 
   def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 36bc0f13064..26e1447fb0f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -113,10 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   val brokerState: BrokerState = new BrokerState
 
-  var apis: KafkaApis = null
+  var dataPlaneRequestProcessor: KafkaApis = null
+  var controlPlaneRequestProcessor: KafkaApis = null
+
   var authorizer: Option[Authorizer] = None
   var socketServer: SocketServer = null
-  var requestHandlerPool: KafkaRequestHandlerPool = null
+  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
   var logManager: LogManager = null
@@ -291,12 +294,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager)
 
-        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
-          config.numIoThreads)
+        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+          config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix)
+
+        config.controlPlaneListener.foreach { _ =>
+          controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+            fetchManager, brokerTopicStats, clusterId, time, tokenManager)
+
+          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
+            1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix)
+        }
 
         Mx4jLoader.maybeLoad()
 
@@ -313,7 +325,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-        socketServer.startProcessors()
+        socketServer.startDataPlaneProcessors()
+        socketServer.startControlPlaneProcessor()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
@@ -579,14 +592,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Socket server will be shutdown towards the end of the sequence.
         if (socketServer != null)
           CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
-        if (requestHandlerPool != null)
-          CoreUtils.swallow(requestHandlerPool.shutdown(), this)
-
+        if (dataPlaneRequestHandlerPool != null)
+          CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+        if (controlPlaneRequestHandlerPool != null)
+          CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
         if (kafkaScheduler != null)
           CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
-        if (apis != null)
-          CoreUtils.swallow(apis.close(), this)
+        if (dataPlaneRequestProcessor != null)
+          CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+        if (controlPlaneRequestProcessor != null)
+          CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
         CoreUtils.swallow(authorizer.foreach(_.close()), this)
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown(), this)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index ee69ae42a0f..92d1758b194 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     client = AdminClient.create(createConfig())
     val nodes = client.describeCluster.nodes.get()
     val clusterId = client.describeCluster().clusterId().get()
-    assertEquals(servers.head.apis.clusterId, clusterId)
+    assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
     val controller = client.describeCluster().controller().get()
-    assertEquals(servers.head.apis.metadataCache.getControllerId.
+    assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
       getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
     val brokers = brokerList.split(",")
     assertEquals(brokers.size, nodes.size)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1b51101e76f..4c0459e72e0 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   def removeAllAcls() = {
-    servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
-      servers.head.apis.authorizer.get.removeAcls(resource)
-      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls().keys.foreach { resource =>
+      servers.head.dataPlaneRequestProcessor.authorizer.get.removeAcls(resource)
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
     }
   }
 
@@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
-    servers.head.apis.authorizer.get.addAcls(acls, resource)
-    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneRequestProcessor.authorizer.get.addAcls(acls, resource)
+    TestUtils.waitAndVerifyAcls(servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls(resource) ++ acls, servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
   }
 
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9b02d809b55..b28a40f890e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String,
 
   def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) {
     TestUtils.retry(10000) {
-      val quotaManagers = server.apis.quotas
+      val quotaManagers = server.dataPlaneRequestProcessor.quotas
       val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
       val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId)
       val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 854e3381342..0587a6de160 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -182,8 +182,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   override def setUp() {
     super.setUp()
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL))
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource.ClusterResource)
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource(Topic, "*", LITERAL))
     }
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, 3)
@@ -221,7 +221,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       assertTrue("failed re-authentications not 0", TestUtils.totalMetricValue(s, "failed-reauthentication-total") == 0)
     }
   }
-  
+
   private def getGauge(metricName: String) = {
     Metrics.defaultRegistry.allMetrics.asScala
            .filterKeys(k => k.getName == metricName)
@@ -275,16 +275,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   private def setWildcardResourceAcls() {
     AclCommand.main(produceConsumeWildcardAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
     }
   }
 
   private def setPrefixedResourceAcls() {
     AclCommand.main(produceConsumePrefixedAclsArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
     }
   }
 
@@ -292,9 +292,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(consumeAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get,
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
         new Resource(Topic, tp.topic, PatternType.LITERAL))
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -315,7 +315,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def testNoProduceWithDescribeAcl(): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
     try{
       val producer = createProducer()
@@ -327,7 +327,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
    /**
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
@@ -341,7 +341,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(consumer)
     confirmReauthenticationMetrics
   }
-  
+
   @Test(expected = classOf[TopicAuthorizationException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
@@ -350,13 +350,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // this should timeout since the consumer will not be able to fetch any metadata for the topic
     consumeRecords(consumer, timeout = 3000)
   }
-  
+
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
 
     val producer = createProducer()
@@ -365,10 +365,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaAssign(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -384,7 +384,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -400,13 +400,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
   private def noConsumeWithDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -420,7 +420,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def testNoGroupAcl(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 55e1529c963..cb2186c4227 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     authorizer.addAcls(acls, AuthResource.ClusterResource)
     TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource)
@@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource))
     TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 853f99944ff..c13b0a384c8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -501,8 +501,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   @Test
   def testThreadPoolResize(): Unit = {
-    val requestHandlerPrefix = "kafka-request-handler-"
-    val networkThreadPrefix = "kafka-network-thread-"
+    val requestHandlerPrefix = "data-plane-kafka-request-handler-"
+    val networkThreadPrefix = "data-plane-kafka-network-thread-"
     val fetcherThreadPrefix = "ReplicaFetcherThread-"
     // Executor threads and recovery threads are not verified since threads may not be running
     // For others, thread count should be configuredCount * threadMultiplier * numBrokers
@@ -577,7 +577,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
-    verifyThreads("kafka-socket-acceptor-", config.listeners.size)
+    verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size)
 
     verifyProcessorMetrics()
     verifyMarkPartitionsForTruncation()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2306a921898..a64f6e7ab92 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 23f62252fcb..08747a85a8e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -32,8 +32,10 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
 import org.apache.log4j.Level
 import kafka.utils.LogCaptureAppender
+import org.apache.kafka.common.metrics.KafkaMetric
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Try
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
@@ -83,6 +85,31 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
+  @Test
+  def testMetadataPropagationOnControlPlane(): Unit = {
+    servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+      controlPlaneListenerName = Some("CONTROLLER"))
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+    val dataPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+    servers.head.metrics.metrics().values().asScala.foreach { kafkaMetric =>
+      if (kafkaMetric.metricName().tags().values().contains("CONTROLLER")) {
+        controlPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+      }
+      if (kafkaMetric.metricName().tags().values().contains("PLAINTEXT")) {
+        dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+      }
+    }
+    assertEquals(1e-0, controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(1e-0, controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+    assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0)
+    assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0)
+    assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0)
+    assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0)
+  }
+
   // This test case is used to ensure that there will be no correctness issue after we avoid sending out full
   // UpdateMetadataRequest to all brokers in the cluster
   @Test
@@ -376,10 +403,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     // wait for the update metadata request to trickle to the brokers
     TestUtils.waitUntilTrue(() =>
-      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
+      activeServers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
       "Topic test not created after timeout")
     assertEquals(0, partitionsRemaining.size)
-    var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    var partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
     var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
     assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
@@ -388,16 +415,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     partitionsRemaining = resultQueue.take().get
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
-    partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
     leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
 
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+    assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
     controller.controlledShutdown(0, servers.find(_.config.brokerId == 0).get.kafkaController.brokerEpoch, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(1, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+    assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
   }
 
   @Test
@@ -585,12 +612,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   private def makeServers(numConfigs: Int,
                           autoLeaderRebalanceEnable: Boolean = false,
                           uncleanLeaderElectionEnable: Boolean = false,
-                          enableControlledShutdown: Boolean = true) = {
+                          enableControlledShutdown: Boolean = true,
+                          listeners : Option[String] = None,
+                          listenerSecurityProtocolMap : Option[String] = None,
+                          controlPlaneListenerName : Option[String] = None) = {
     val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown = enableControlledShutdown)
     configs.foreach { config =>
       config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
       config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
       config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
+      listeners.foreach(listener => config.setProperty(KafkaConfig.ListenersProp, listener))
+      listenerSecurityProtocolMap.foreach(listenerMap => config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap))
+      controlPlaneListenerName.foreach(controlPlaneListener => config.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListener))
     }
     configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
   }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b5983377b77..2609d9e0b79 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net._
 import java.nio.ByteBuffer
 import java.nio.channels.SocketChannel
-import java.util.{HashMap, Random}
+import java.util.{HashMap, Properties, Random}
 
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
@@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None))
   }
 
-  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
-    val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
+  def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: InetAddress = null, port: Int = 0) = {
+    val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, port)
     sockets += socket
     socket
   }
@@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite {
   def connectAndProcessRequest(s: SocketServer): (Socket, String) = {
     val socket = connect(s)
     val request = sendAndReceiveRequest(socket, s)
-    processRequest(s.requestChannel, request)
+    processRequest(s.dataPlaneRequestChannel, request)
     (socket, request.context.connectionId)
   }
 
   def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
     sendRequest(socket, producerRequestBytes())
-    receiveRequest(server.requestChannel)
+    receiveRequest(server.dataPlaneRequestChannel)
   }
 
   def shutdownServerAndMetrics(server: SocketServer): Unit = {
@@ -176,15 +176,29 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def simpleRequest() {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, serializedBytes)
-    processRequest(server.requestChannel)
+    processRequest(server.dataPlaneRequestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
   }
 
+  @Test
+  def testControlPlaneRequest(): Unit = {
+    val testProps = new Properties
+    testProps.putAll(props)
+    testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROLLER")
+    val config = KafkaConfig.fromProps(testProps)
+    withTestableServer(config, { testableServer =>
+      val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port = 5000)
+      sendAndReceiveControllerRequest(socket, testableServer)
+    })
+  }
+
   @Test
   def tooBigRequestIsRejected() {
     val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1)
@@ -205,41 +219,41 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testGracefulClose() {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     for (_ <- 0 until 10)
       sendRequest(plainSocket, serializedBytes)
     plainSocket.close()
     for (_ <- 0 until 10) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataPlaneRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+      server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
   @Test
   def testNoOpAction(): Unit = {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     for (_ <- 0 until 3)
       sendRequest(plainSocket, serializedBytes)
     for (_ <- 0 until 3) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataPlaneRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+      server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
   @Test
   def testConnectionId() {
-    val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT))
+    val sockets = (1 to 5).map(_ => connect())
     val serializedBytes = producerRequestBytes()
 
     val requests = sockets.map{socket =>
       sendRequest(socket, serializedBytes)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataPlaneRequestChannel)
     }
     requests.zipWithIndex.foreach { case (request, i) =>
       val index = request.context.connectionId.split("-").last
@@ -258,36 +272,36 @@ class SocketServerTest extends JUnitSuite {
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider)
 
     def openChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-      overrideServer.processor(request.processor).channel(request.context.connectionId)
+      overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId)
     def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-      overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId)
+      overrideServer.dataPlaneProcessor(request.processor).openOrClosingChannel(request.context.connectionId)
 
     try {
       overrideServer.startup()
       val serializedBytes = producerRequestBytes()
 
       // Connection with no staged receives
-      val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      val socket1 = connect(overrideServer)
       sendRequest(socket1, serializedBytes)
-      val request1 = receiveRequest(overrideServer.requestChannel)
+      val request1 = receiveRequest(overrideServer.dataPlaneRequestChannel)
       assertTrue("Channel not open", openChannel(request1).nonEmpty)
       assertEquals(openChannel(request1), openOrClosingChannel(request1))
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to close idle channel")
       assertTrue("Channel not removed", openChannel(request1).isEmpty)
-      processRequest(overrideServer.requestChannel, request1)
+      processRequest(overrideServer.dataPlaneRequestChannel, request1)
 
       // Connection with staged receives
-      val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      val socket2 = connect(overrideServer)
       val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, serializedBytes)
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel")
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives")
-      processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed
+      processRequest(overrideServer.dataPlaneRequestChannel, request2) // this triggers a failed send since channel has been closed
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends")
-      assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200))
+      assertNull("Received request after failed send", overrideServer.dataPlaneRequestChannel.receiveRequest(200))
 
     } finally {
       shutdownServerAndMetrics(overrideServer)
@@ -304,9 +318,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
             override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -319,8 +333,8 @@ class SocketServerTest extends JUnitSuite {
       }
     }
 
-    def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
-    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+    def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
     def connectionCount = overrideServer.connectionCount(InetAddress.getByName("127.0.0.1"))
 
     // Create a client connection and wait for server to register the connection with the selector. For
@@ -361,7 +375,7 @@ class SocketServerTest extends JUnitSuite {
       assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel not found")))
 
       // Complete request with failed send so that `channel1` is removed from Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataPlaneRequestChannel, request)
       TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send")
 
       // Check that new connections can be created with the same id since `channel1` is no longer in Selector
@@ -380,14 +394,14 @@ class SocketServerTest extends JUnitSuite {
     def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
       sendRequest(socket, requestBytes, flush = false)
       sendRequest(socket, requestBytes, flush = true)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataPlaneRequestChannel)
     }
     val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
       val connectionId = req.context.connectionId
-      val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0
+      val hasStagedReceives = server.dataPlaneProcessor(0).numStagedReceives(connectionId) > 0
       if (!hasStagedReceives) {
-        processRequest(server.requestChannel, req)
-        processRequest(server.requestChannel)
+        processRequest(server.dataPlaneRequestChannel, req)
+        processRequest(server.dataPlaneRequestChannel)
       }
       hasStagedReceives
     }
@@ -403,11 +417,11 @@ class SocketServerTest extends JUnitSuite {
 
     // Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a
     // throttled channel.
-    val request = receiveRequest(server.requestChannel)
+    val request = receiveRequest(server.dataPlaneRequestChannel)
     val byteBuffer = request.body[AbstractRequest].serialize(request.header)
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
     def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
-      server.requestChannel.sendResponse(response)
+      server.dataPlaneRequestChannel.sendResponse(response)
     }
     val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback)
     val response =
@@ -415,7 +429,7 @@ class SocketServerTest extends JUnitSuite {
         new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)
       else
         new RequestChannel.NoOpResponse(request)
-    server.requestChannel.sendResponse(response)
+    server.dataPlaneRequestChannel.sendResponse(response)
 
     // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
     // false.
@@ -426,11 +440,11 @@ class SocketServerTest extends JUnitSuite {
   }
 
   def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-    server.processor(0).openOrClosingChannel(request.context.connectionId)
+    server.dataPlaneProcessor(0).openOrClosingChannel(request.context.connectionId)
 
   @Test
   def testSendActionResponseWithThrottledChannelWhereThrottlingInProgress() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, false, true)
@@ -444,7 +458,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, false, false)
@@ -459,7 +473,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, true, true)
@@ -471,7 +485,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, true, false)
@@ -485,16 +499,16 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     plainSocket.setTcpNoDelay(true)
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by the socket server
     sendRequest(plainSocket, bytes, Some(0))
-    processRequest(server.requestChannel)
+    processRequest(server.dataPlaneRequestChannel)
     // the following sleep is necessary to reliably detect the connection close when we send data below
     Thread.sleep(200L)
-    // make sure the sockets are open
-    server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
+    // make sure the sockets ar e open
+    server.dataPlaneAcceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
     shutdownServerAndMetrics(server)
 
@@ -527,7 +541,7 @@ class SocketServerTest extends JUnitSuite {
     val conn2 = connect()
     val serializedBytes = producerRequestBytes()
     sendRequest(conn2, serializedBytes)
-    val request = server.requestChannel.receiveRequest(2000)
+    val request = server.dataPlaneRequestChannel.receiveRequest(2000)
     assertNotNull(request)
   }
 
@@ -555,7 +569,7 @@ class SocketServerTest extends JUnitSuite {
       val conn2 = connect(server)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn2, serializedBytes)
-      val request = server.requestChannel.receiveRequest(2000)
+      val request = server.dataPlaneRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try to connect from the external facing interface, which should fail
@@ -583,7 +597,7 @@ class SocketServerTest extends JUnitSuite {
       // it should succeed
       val serializedBytes = producerRequestBytes()
       sendRequest(conns.last, serializedBytes)
-      val request = overrideServer.requestChannel.receiveRequest(2000)
+      val request = overrideServer.dataPlaneRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try one more (should fail)
@@ -627,7 +641,7 @@ class SocketServerTest extends JUnitSuite {
       byteBuffer.get(serializedBytes)
 
       sendRequest(sslSocket, serializedBytes)
-      processRequest(overrideServer.requestChannel)
+      processRequest(overrideServer.dataPlaneRequestChannel)
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
       sslSocket.close()
     } finally {
@@ -640,7 +654,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
-    assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal)
+    assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal)
   }
 
   /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */
@@ -650,9 +664,9 @@ class SocketServerTest extends JUnitSuite {
     val serverMetrics = new Metrics
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
@@ -668,7 +682,7 @@ class SocketServerTest extends JUnitSuite {
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
 
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataPlaneRequestChannel
       val request = receiveRequest(channel)
 
       val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -696,9 +710,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -711,8 +725,8 @@ class SocketServerTest extends JUnitSuite {
       }
     }
 
-    def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
-    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+    def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
 
     try {
       overrideServer.startup()
@@ -730,7 +744,7 @@ class SocketServerTest extends JUnitSuite {
       socket.close()
 
       // Complete request with socket exception so that the channel is removed from Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataPlaneRequestChannel, request)
       TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send")
       assertTrue("Unexpected completed send", selector.completedSends.isEmpty)
     } finally {
@@ -755,10 +769,10 @@ class SocketServerTest extends JUnitSuite {
       conn = connect(overrideServer)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataPlaneRequestChannel
       val request = receiveRequest(channel)
 
-      TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
+      TestUtils.waitUntilTrue(() => overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId).isEmpty,
         s"Idle connection `${request.context.connectionId}` was not closed by selector")
 
       val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -780,10 +794,10 @@ class SocketServerTest extends JUnitSuite {
     server.stopProcessingRequests()
     val version = ApiKeys.PRODUCE.latestVersion
     val version2 = (version - 1).toShort
-    for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
-    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
-    assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
-    server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
+    for (_ <- 0 to 1) server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+    server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+    assertEquals(2, server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
+    server.dataPlaneRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
     val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
         s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
         "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
@@ -808,7 +822,7 @@ class SocketServerTest extends JUnitSuite {
       .allMetrics.asScala
       .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
       .collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) }
-      .filter { case (_, value) => value != 0.0 }
+      .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) }
 
     assertEquals(Map.empty, nonZeroMetricNamesAndValues)
   }
@@ -818,7 +832,7 @@ class SocketServerTest extends JUnitSuite {
     val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty)
     assertFalse(kafkaMetricNames.isEmpty)
 
-    val expectedListeners = Set("PLAINTEXT", "TRACE")
+    val expectedListeners = Set("PLAINTEXT")
     kafkaMetricNames.foreach { kafkaMetricName =>
       assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener")))
     }
@@ -846,7 +860,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def configureNewConnectionException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
 
       testableSelector.updateMinWakeup(2)
@@ -856,7 +870,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => testableServer.connectionCount(localAddress) == 1, "Failed channel not removed")
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -871,7 +885,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processNewResponseException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
@@ -879,12 +893,12 @@ class SocketServerTest extends JUnitSuite {
       sockets.foreach(sendRequest(_, producerRequestBytes()))
 
       testableServer.testableSelector.addFailure(SelectorOperation.Send)
-      sockets.foreach(_ => processRequest(testableServer.requestChannel))
+      sockets.foreach(_ => processRequest(testableServer.dataPlaneRequestChannel))
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -894,13 +908,13 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def sendCancelledKeyException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
       val sockets = (1 to 2).map(_ => connect(testableServer))
       sockets.foreach(sendRequest(_, producerRequestBytes()))
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataPlaneRequestChannel
 
       val requests = sockets.map(_ => receiveRequest(requestChannel))
       val failedConnectionId = requests(0).context.connectionId
@@ -912,7 +926,7 @@ class SocketServerTest extends JUnitSuite {
 
       val successfulSocket = if (isSocketConnectionId(failedConnectionId, sockets(0))) sockets(1) else sockets(0)
       assertProcessorHealthy(testableServer, Seq(successfulSocket))
-    }
+    })
   }
 
   /**
@@ -922,7 +936,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def closingChannelException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
@@ -933,13 +947,13 @@ class SocketServerTest extends JUnitSuite {
 
       testableSelector.addFailure(SelectorOperation.Send)
       sockets(0).close()
-      processRequest(testableServer.requestChannel, request)
-      processRequest(testableServer.requestChannel) // Also process request from other channel
+      processRequest(testableServer.dataPlaneRequestChannel, request)
+      processRequest(testableServer.dataPlaneRequestChannel) // Also process request from other channel
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, Seq(sockets(1)))
-    }
+    })
   }
 
   /**
@@ -954,10 +968,10 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processCompletedReceiveException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val sockets = (1 to 2).map(_ => connect(testableServer))
       val testableSelector = testableServer.testableSelector
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataPlaneRequestChannel
 
       testableSelector.cachedCompletedReceives.minPerPoll = 2
       testableSelector.addFailure(SelectorOperation.Mute)
@@ -968,7 +982,7 @@ class SocketServerTest extends JUnitSuite {
       requests.foreach(processRequest(requestChannel, _))
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -983,18 +997,18 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processCompletedSendException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       val sockets = (1 to 2).map(_ => connect(testableServer))
       val requests = sockets.map(sendAndReceiveRequest(_, testableServer))
 
       testableSelector.addFailure(SelectorOperation.Unmute)
-      requests.foreach(processRequest(testableServer.requestChannel, _))
+      requests.foreach(processRequest(testableServer.dataPlaneRequestChannel, _))
       testableSelector.waitForOperations(SelectorOperation.Unmute, 2)
       testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -1007,7 +1021,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processDisconnectedException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val (socket, connectionId) = connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1021,7 +1035,7 @@ class SocketServerTest extends JUnitSuite {
       testableServer.waitForChannelClose(connectionId, locallyClosed = false)
 
       assertProcessorHealthy(testableServer)
-    }
+    })
   }
 
   /**
@@ -1029,7 +1043,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def pollException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val (socket, _) = connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1038,7 +1052,7 @@ class SocketServerTest extends JUnitSuite {
       testableSelector.waitForOperations(SelectorOperation.Poll, 2)
 
       assertProcessorHealthy(testableServer, Seq(socket))
-    }
+    })
   }
 
   /**
@@ -1046,7 +1060,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def controlThrowable(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1056,12 +1070,12 @@ class SocketServerTest extends JUnitSuite {
       testableSelector.waitForOperations(SelectorOperation.Poll, 1)
 
       testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
-    }
+    })
   }
 
-  private def withTestableServer(testWithServer: TestableSocketServer => Unit): Unit = {
+  private def withTestableServer(config : KafkaConfig = config, testWithServer: TestableSocketServer => Unit): Unit = {
     props.put("listeners", "PLAINTEXT://localhost:0")
-    val testableServer = new TestableSocketServer
+    val testableServer = new TestableSocketServer(config)
     testableServer.startup()
     try {
         testWithServer(testableServer)
@@ -1070,10 +1084,15 @@ class SocketServerTest extends JUnitSuite {
     }
   }
 
+  def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
+    sendRequest(socket, producerRequestBytes())
+    receiveRequest(server.controlPlaneRequestChannelOpt.get)
+  }
+
   private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = {
     val selector = testableServer.testableSelector
     selector.reset()
-    val requestChannel = testableServer.requestChannel
+    val requestChannel = testableServer.dataPlaneRequestChannel
 
     // Check that existing channels behave as expected
     healthySockets.foreach { socket =>
@@ -1096,18 +1115,17 @@ class SocketServerTest extends JUnitSuite {
   def isSocketConnectionId(connectionId: String, socket: Socket): Boolean =
     connectionId.contains(s":${socket.getLocalPort}-")
 
-  class TestableSocketServer extends SocketServer(KafkaConfig.fromProps(props),
+  class TestableSocketServer(config : KafkaConfig = config) extends SocketServer(config,
       new Metrics, Time.SYSTEM, credentialProvider) {
 
     @volatile var selector: Option[TestableSelector] = None
 
-    override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+    override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
         config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) {
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
-           val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
-           assertEquals(None, selector)
+           val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
            selector = Some(testableSelector)
            testableSelector
         }
@@ -1131,7 +1149,7 @@ class SocketServerTest extends JUnitSuite {
       val openCount = selector.allChannels.size - 1 // minus one for the channel just closed above
       TestUtils.waitUntilTrue(() => connectionCount(localAddress) == openCount, "Connection count not decremented")
       TestUtils.waitUntilTrue(() =>
-        processor(0).inflightResponseCount == 0, "Inflight responses not cleared")
+        dataPlaneProcessor(0).inflightResponseCount == 0, "Inflight responses not cleared")
       assertNull("Channel not removed", selector.channel(connectionId))
       assertNull("Closing channel not removed", selector.closingChannel(connectionId))
     }
@@ -1149,9 +1167,9 @@ class SocketServerTest extends JUnitSuite {
     case object CloseSelector extends SelectorOperation
   }
 
-  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics)
+  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = mutable.Map.empty)
         extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
-            metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
+            metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
 
     val failures = mutable.Map[SelectorOperation, Exception]()
     val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
@@ -1302,4 +1320,4 @@ class SocketServerTest extends JUnitSuite {
       sockets.filterNot(socket => isSocketConnectionId(failedConnectionId, socket))
     }
   }
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 789dbaeb8f7..f5cc1342970 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
 
-    val quotaManagers = servers.head.apis.quotas
+    val quotaManagers = servers.head.dataPlaneRequestProcessor.quotas
     rootEntityType match {
       case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
       case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
@@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Remove config change znodes to force quota initialization only through loading of user/client quotas
     zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }
     server.startup()
-    val quotaManagers = server.apis.quotas
+    val quotaManagers = server.dataPlaneRequestProcessor.quotas
 
     assertEquals(Quota.upperBound(1000),  quotaManagers.produce.quota("someuser", "overriddenClientId"))
     assertEquals(Quota.upperBound(2000),  quotaManagers.fetch.quota("someuser", "overriddenClientId"))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 10dba1a4793..f21f38427b2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -229,6 +229,31 @@ class KafkaConfigTest {
     assertFalse(isValidKafkaConfig(props))
   }
 
+  @Test
+  def testControlPlaneListenerName() = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.put("control.plane.listener.name", "CONTROLLER")
+    assertTrue(isValidKafkaConfig(props))
+
+    val serverConfig = KafkaConfig.fromProps(props)
+    val controlEndpoint = serverConfig.controlPlaneListener.get
+    assertEquals("localhost", controlEndpoint.host)
+    assertEquals(5000, controlEndpoint.port)
+    assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
+
+    //advertised listener should contain control-plane listener
+    val advertisedEndpoints = serverConfig.advertisedListeners
+    assertFalse(advertisedEndpoints.filter { endpoint =>
+      endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
+    }.isEmpty)
+
+    // interBrokerListener name should be different from control-plane listener name
+    val interBrokerListenerName = serverConfig.interBrokerListenerName
+    assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value()))
+  }
+
   @Test
   def testBadListenerProtocol() {
     val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 4e60a8f6027..ef3dece3063 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
       val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
-      metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id
+      metadataResponse2.controller != null && controllerServer2.dataPlaneRequestProcessor.brokerId == metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 
@@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(topic1, topicMetadata1.topic)
     assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
     assertEquals(topic2, topicMetadata2.topic)
-    
+
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
 
@@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest {
     val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
-      val serverId = server.apis.brokerId
+      val serverId = server.dataPlaneRequestProcessor.brokerId
       val leaderId = partitionMetadata.leader.id
       val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
       serverId != leaderId && replicaIds.contains(serverId)
@@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => {
       val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
       val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
+      val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
       replica.host == "" & replica.port == -1
     }, "Replica was not found down", 5000)
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ae8353657a6..c9e4e7890eb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -99,7 +99,7 @@ class RequestQuotaTest extends BaseRequestTest {
     adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
 
     TestUtils.retry(10000) {
-      val quotaManager = servers.head.apis.quotas.request
+      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
       assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e5ea6a4baae..ecdb8c8739c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -835,7 +835,7 @@ object TestUtils extends Logging {
                                           timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
     val expectedBrokerIds = servers.map(_.config.brokerId).toSet
     TestUtils.waitUntilTrue(() => servers.forall(server =>
-      expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+      expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
@@ -855,7 +855,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(() =>
       servers.foldLeft(true) {
         (result, server) =>
-          val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
+          val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition)
           partitionStateOpt match {
             case None => false
             case Some(partitionState) =>
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index b15f8ac47b5..c120caa4581 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -313,8 +313,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8a2122d8e37..efd0d49be44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -517,7 +517,7 @@ public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers
                                                      final long timeout) throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             for (final KafkaServer server : servers) {
-                final MetadataCache metadataCache = server.apis().metadataCache();
+                final MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache();
                 final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
                         metadataCache.getPartitionInfo(topic, partition);
                 if (partitionInfo.isEmpty()) {


With regards,
Apache Git Services

Mime
View raw message