kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2614; No more clients can connect after `TooManyConnectionsException` threshold (max.connections.per.ip) is reached
Date Sat, 10 Oct 2015 22:24:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c67ca6588 -> 1265d7cb7


KAFKA-2614; No more clients can connect after `TooManyConnectionsException` threshold (max.connections.per.ip)
is reached

* Call `ConnectionQuotas.decr` when calling `Selector.close` and when disconnections happen.
* Expand `SocketServerTest` to test for this and to close sockets.
* Refactor and clean-up `SocketServer` and `Acceptor` to make the code easier to understand.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #288 from ijuma/kafka-2614-connection-count-not-updated


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1265d7cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1265d7cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1265d7cb

Branch: refs/heads/trunk
Commit: 1265d7cb7f56b5f5aad940250dc705a9a060a6f0
Parents: c67ca65
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Sat Oct 10 15:24:36 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Sat Oct 10 15:24:36 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/network/KafkaChannel.java      |  20 +-
 .../apache/kafka/common/network/Selectable.java |   2 +-
 .../apache/kafka/common/network/Selector.java   |  41 +--
 .../common/network/SSLTransportLayerTest.java   |   8 +-
 .../scala/kafka/cluster/BrokerEndPoint.scala    |  24 +-
 .../main/scala/kafka/network/SocketServer.scala | 283 ++++++++++---------
 .../unit/kafka/network/SocketServerTest.scala   |  59 ++--
 7 files changed, 256 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 28a4f41..172f4cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.network;
 
 import java.io.IOException;
 
+import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
 
@@ -108,14 +109,21 @@ public class KafkaChannel {
         return send != null;
     }
 
+    /**
+     * Returns the address to which this channel's socket is connected or `null` if the socket
has never been connected.
+     *
+     * If the socket was connected prior to being closed, then this method will continue
to return the
+     * connected address after the socket is closed.
+     */
+    public InetAddress socketAddress() {
+        return transportLayer.socketChannel().socket().getInetAddress();
+    }
+
     public String socketDescription() {
         Socket socket = transportLayer.socketChannel().socket();
-        if (socket == null)
-            return "[unconnected socket]";
-        else if (socket.getInetAddress() != null)
-            return socket.getInetAddress().toString();
-        else
+        if (socket.getInetAddress() == null)
             return socket.getLocalAddress().toString();
+        return socket.getInetAddress().toString();
     }
 
     public void setSend(Send send) {
@@ -132,7 +140,7 @@ public class KafkaChannel {
             receive = new NetworkReceive(maxReceiveSize, id);
         }
 
-        long x = receive(receive);
+        receive(receive);
         if (receive.complete()) {
             receive.payload().rewind();
             result = receive;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 629fa0d..10ca632 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -50,7 +50,7 @@ public interface Selectable {
     /**
      * Close the connection identified by the given id
      */
-    public void close(String nodeId);
+    public void close(String id);
 
     /**
      * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 7cdc167..e1e5b4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -207,10 +207,7 @@ public class Selector implements Selectable {
      * @param send The request to send
      */
     public void send(Send send) {
-        KafkaChannel channel = channelForId(send.destination());
-        if (channel == null)
-            throw new IllegalStateException("channel is not connected");
-
+        KafkaChannel channel = channelOrFail(send.destination());
         try {
             channel.setSend(send);
         } catch (CancelledKeyException e) {
@@ -243,6 +240,7 @@ public class Selector implements Selectable {
      * @throws IllegalArgumentException If `timeout` is negative
      * @throws IllegalStateException If a send is given for which we have no existing connection
or for which there is
      *         already an in-progress send
+     * @throws InvalidReceiveException If invalid data is received
      */
     @Override
     public void poll(long timeout) throws IOException {
@@ -351,7 +349,7 @@ public class Selector implements Selectable {
 
     @Override
     public void mute(String id) {
-        KafkaChannel channel = channelForId(id);
+        KafkaChannel channel = channelOrFail(id);
         mute(channel);
     }
 
@@ -361,7 +359,7 @@ public class Selector implements Selectable {
 
     @Override
     public void unmute(String id) {
-        KafkaChannel channel = channelForId(id);
+        KafkaChannel channel = channelOrFail(id);
         unmute(channel);
     }
 
@@ -433,8 +431,7 @@ public class Selector implements Selectable {
     }
 
     /**
-     * Begin closing this connection
-     * @param id channel id
+     * Close the connection identified by the given id
      */
     public void close(String id) {
         KafkaChannel channel = this.channels.get(id);
@@ -463,23 +460,35 @@ public class Selector implements Selectable {
      */
     @Override
     public boolean isChannelReady(String id) {
-        KafkaChannel channel = channelForId(id);
+        KafkaChannel channel = this.channels.get(id);
+        if (channel == null)
+            return false;
         return channel.ready();
     }
 
-    /**
-     * Get the channel associated with this connection
-     * Exposing this to allow SocketServer get the Principal from the channel when creating
a request
-     * without making Selector know about Principals
-     */
-    public KafkaChannel channelForId(String id) {
+    private KafkaChannel channelOrFail(String id) {
         KafkaChannel channel = this.channels.get(id);
         if (channel == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is
no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
+            throw new IllegalStateException("Attempt to retrieve channel for which there
is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
         return channel;
     }
 
     /**
+     * Return the selector channels.
+     */
+    public List<KafkaChannel> channels() {
+        return new ArrayList<>(channels.values());
+    }
+
+    /**
+     * Return the channel associated with this connection or `null` if there is no channel
associated with the
+     * connection.
+     */
+    public KafkaChannel channel(String id) {
+        return this.channels.get(id);
+    }
+
+    /**
      * Get the channel associated with selectionKey
      */
     private KafkaChannel channel(SelectionKey key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
index 43da621..6993f52 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
@@ -418,9 +418,7 @@ public class SSLTransportLayerTest {
         boolean closed = false;
         for (int i = 0; i < 30; i++) {
             selector.poll(1000L);
-            try {
-                selector.channelForId(node);
-            } catch (IllegalStateException e) {
+            if (selector.channel(node) == null) {
                 closed = true;
                 break;
             }
@@ -581,7 +579,7 @@ public class SSLTransportLayerTest {
                     newChannels.clear();
                     while (true) {
                         NetworkSend send = inflightSends.peek();
-                        if (send != null && !selector.channelForId(send.destination()).hasSend())
{
+                        if (send != null && !selector.channel(send.destination()).hasSend())
{
                             send = inflightSends.poll();
                             selector.send(send);
                         } else
@@ -590,7 +588,7 @@ public class SSLTransportLayerTest {
                     List<NetworkReceive> completedReceives = selector.completedReceives();
                     for (NetworkReceive rcv : completedReceives) {
                         NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
-                        if (!selector.channelForId(send.destination()).hasSend())
+                        if (!selector.channel(send.destination()).hasSend())
                             selector.send(send);
                         else
                             inflightSends.add(send);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 3395108..75efd77 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -23,15 +23,27 @@ import kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils._
 
 object BrokerEndPoint {
-  def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndPoint = {
 
-    // BrokerEndPoint URI is host:port or [ipv6_host]:port
-    // Note that unlike EndPoint (or listener) this URI has no security information.
-    val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
+  private val uriParseExp = """\[?([0-9a-z\-.:]*)\]?:([0-9]+)""".r
 
+  /**
+   * BrokerEndPoint URI is host:port or [ipv6_host]:port
+   * Note that unlike EndPoint (or listener) this URI has no security information.
+   */
+  def parseHostPort(connectionString: String): Option[(String, Int)] = {
     connectionString match {
-      case uriParseExp(host, port) => new BrokerEndPoint(brokerId, host, port.toInt)
-      case _ => throw new KafkaException("Unable to parse " + connectionString + " to
a broker endpoint")
+      case uriParseExp(host, port) => try Some(host, port.toInt) catch { case e: NumberFormatException
=> None }
+      case _ => None
+    }
+  }
+  
+  /**
+   * BrokerEndPoint URI is host:port or [ipv6_host]:port
+   * Note that unlike EndPoint (or listener) this URI has no security information.
+   */
+  def createBrokerEndPoint(brokerId: Int, connectionString: String): BrokerEndPoint = {
+    parseHostPort(connectionString).map { case (host, port) => new BrokerEndPoint(brokerId,
host, port) }.getOrElse {
+      throw new KafkaException("Unable to parse " + connectionString + " to a broker endpoint")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 57ee318..ecceb97 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -20,26 +20,28 @@ package kafka.network
 import java.io.IOException
 import java.net._
 import java.nio.channels._
+import java.nio.channels.{Selector => NSelector}
 import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 
 import com.yammer.metrics.core.Gauge
-import kafka.cluster.EndPoint
+import kafka.cluster.{BrokerEndPoint, EndPoint}
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, InvalidReceiveException, ChannelBuilder,
PlaintextChannelBuilder, SSLChannelBuilder}
+import org.apache.kafka.common.network.{Selector => KSelector, ChannelBuilders, InvalidReceiveException}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
-import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection._
+import JavaConverters._
 import scala.util.control.{NonFatal, ControlThrowable}
 
 /**
@@ -50,26 +52,21 @@ import scala.util.control.{NonFatal, ControlThrowable}
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends
Logging with KafkaMetricsGroup {
 
-  val channelConfigs = config.channelConfigs
+  private val endpoints = config.listeners
+  private val numProcessorThreads = config.numNetworkThreads
+  private val maxQueuedRequests = config.queuedMaxRequests
+  private val totalProcessorThreads = numProcessorThreads * endpoints.size
 
-  val endpoints = config.listeners
-  val numProcessorThreads = config.numNetworkThreads
-  val maxQueuedRequests = config.queuedMaxRequests
-  val sendBufferSize = config.socketSendBufferBytes
-  val recvBufferSize = config.socketReceiveBufferBytes
-  val maxRequestSize = config.socketRequestMaxBytes
-  val maxConnectionsPerIp = config.maxConnectionsPerIp
-  val connectionsMaxIdleMs = config.connectionsMaxIdleMs
-  val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
-  val totalProcessorThreads = numProcessorThreads * endpoints.size
+  private val maxConnectionsPerIp = config.maxConnectionsPerIp
+  private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
 
   this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
 
   val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
   private val processors = new Array[Processor](totalProcessorThreads)
 
-  private[network] var acceptors =  mutable.Map[EndPoint,Acceptor]()
-
+  private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
+  private var connectionQuotas: ConnectionQuotas = _
 
   private val allMetricNames = (0 until totalProcessorThreads).map { i =>
     val tags = new util.HashMap[String, String]()
@@ -77,30 +74,47 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
     new MetricName("io-wait-ratio", "socket-server-metrics", tags)
   }
 
-  /* I'm pushing the mapping of port-to-protocol to the processor level,
-     so the processor can put the correct protocol in the request channel.
-     we'll probably have a more elegant way of doing this once we patch the request channel
-     to include more information about security and authentication.
-     TODO: re-consider this code when working on KAFKA-1683
-   */
-  private val portToProtocol: ConcurrentHashMap[Int, SecurityProtocol] = new ConcurrentHashMap[Int,
SecurityProtocol]()
-
   /**
    * Start the socket server
    */
   def startup() {
-    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
-
     this.synchronized {
+
+      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
+
+      val channelConfigs = config.channelConfigs
+      val sendBufferSize = config.socketSendBufferBytes
+      val recvBufferSize = config.socketReceiveBufferBytes
+      val maxRequestSize = config.socketRequestMaxBytes
+      val connectionsMaxIdleMs = config.connectionsMaxIdleMs
+      val brokerId = config.brokerId
+
       var processorBeginIndex = 0
-      endpoints.values.foreach(endpoint => {
-        val acceptor = new Acceptor(endpoint.host, endpoint.port, sendBufferSize, recvBufferSize,
config.brokerId, requestChannel, processors, processorBeginIndex, numProcessorThreads, quotas,
-          endpoint.protocolType, portToProtocol, channelConfigs,  maxQueuedRequests, maxRequestSize,
connectionsMaxIdleMs, metrics, allMetricNames, time)
+      endpoints.values.foreach { endpoint =>
+        val protocol = endpoint.protocolType
+        val processorEndIndex = processorBeginIndex + numProcessorThreads
+
+        for (i <- processorBeginIndex until processorEndIndex) {
+          processors(i) = new Processor(i,
+            time,
+            maxRequestSize,
+            requestChannel,
+            connectionQuotas,
+            connectionsMaxIdleMs,
+            protocol,
+            channelConfigs,
+            metrics
+          )
+        }
+
+        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
+          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
         acceptors.put(endpoint, acceptor)
-        Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString,
endpoint.port), acceptor, false).start()
-        acceptor.awaitStartup
-        processorBeginIndex += numProcessorThreads
-      })
+        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port),
acceptor, false).start()
+        acceptor.awaitStartup()
+
+        processorBeginIndex = processorEndIndex
+      }
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -136,7 +150,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
     }
   }
 
+  /* For test usage */
+  private[network] def connectionCount(address: InetAddress): Int =
+    Option(connectionQuotas).fold(0)(_.get(address))
+
 }
+
 /**
  * A base class with some helper variables and methods
  */
@@ -180,19 +199,24 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas:
ConnectionQ
   protected def isRunning = alive.get
 
   /**
-   * Close the given key and associated socket
+   * Close the connection identified by `connectionId` and decrement the connection count.
    */
-  def close(key: SelectionKey) {
-    if(key != null) {
-      key.attach(null)
-      close(key.channel.asInstanceOf[SocketChannel])
-      swallowError(key.cancel())
+  def close(selector: KSelector, connectionId: String) {
+    val channel = selector.channel(connectionId)
+    if (channel != null) {
+      debug(s"Closing selector connection $connectionId")
+      val address = channel.socketAddress
+      if (address != null)
+        connectionQuotas.dec(address)
+      selector.close(connectionId)
     }
   }
 
+  /**
+   * Close `channel` and decrement the connection count.
+   */
   def close(channel: SocketChannel) {
-
-    if(channel != null) {
+    if (channel != null) {
       debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
       connectionQuotas.dec(channel.socket.getInetAddress)
       swallowError(channel.socket().close())
@@ -202,47 +226,21 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas:
ConnectionQ
 }
 
 /**
- * Thread that accepts and configures new connections. There is only need for one of these
+ * Thread that accepts and configures new connections. There is one of these per endpoint.
  */
-private[kafka] class Acceptor(val host: String,
-                              private val port: Int,
+private[kafka] class Acceptor(val endPoint: EndPoint,
                               val sendBufferSize: Int,
                               val recvBufferSize: Int,
                               brokerId: Int,
-                              requestChannel: RequestChannel,
                               processors: Array[Processor],
-                              processorBeginIndex: Int,
-                              numProcessorThreads: Int,
-                              connectionQuotas: ConnectionQuotas,
-                              protocol: SecurityProtocol,
-                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol],
-                              channelConfigs: java.util.Map[String, Object],
-                              maxQueuedRequests: Int,
-                              maxRequestSize: Int,
-                              connectionsMaxIdleMs: Long,
-                              metrics: Metrics,
-                              allMetricNames: Seq[MetricName],
-                              time: Time) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
-  val nioSelector = java.nio.channels.Selector.open()
-  val serverChannel = openServerSocket(host, port)
-  val processorEndIndex = processorBeginIndex + numProcessorThreads
-
-  portToProtocol.put(serverChannel.socket().getLocalPort, protocol)
+                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+
+  private val nioSelector = NSelector.open()
+  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
 
   this.synchronized {
-    for (i <- processorBeginIndex until processorEndIndex) {
-        processors(i) = new Processor(i,
-          time,
-          maxRequestSize,
-          numProcessorThreads,
-          requestChannel,
-          connectionQuotas,
-          connectionsMaxIdleMs,
-          protocol,
-          channelConfigs,
-          metrics
-          )
-        Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, protocol.name, i),
processors(i), false).start()
+    processors.zipWithIndex.foreach { case (processor, i) =>
+      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString,
i), processor, false).start()
     }
   }
 
@@ -250,10 +248,10 @@ private[kafka] class Acceptor(val host: String,
    * Accept loop that checks for new connection attempts
    */
   def run() {
-    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
+    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
     startupComplete()
-    var currentProcessor = processorBeginIndex
     try {
+      var currentProcessor = 0
       while (isRunning) {
         try {
           val ready = nioSelector.select(500)
@@ -261,9 +259,8 @@ private[kafka] class Acceptor(val host: String,
             val keys = nioSelector.selectedKeys()
             val iter = keys.iterator()
             while (iter.hasNext && isRunning) {
-              var key: SelectionKey = null
               try {
-                key = iter.next
+                val key = iter.next
                 iter.remove()
                 if (key.isAcceptable)
                   accept(key, processors(currentProcessor))
@@ -271,8 +268,7 @@ private[kafka] class Acceptor(val host: String,
                   throw new IllegalStateException("Unrecognized key state for acceptor thread.")
 
                 // round robin to the next processor thread
-                currentProcessor = (currentProcessor + 1) % processorEndIndex
-                if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex
+                currentProcessor = (currentProcessor + 1) % processors.length
               } catch {
                 case e: Throwable => error("Error while accepting connection", e)
               }
@@ -298,7 +294,7 @@ private[kafka] class Acceptor(val host: String,
   /*
    * Create a server socket to listen for connections on.
    */
-  def openServerSocket(host: String, port: Int): ServerSocketChannel = {
+  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
       if(host == null || host.trim.isEmpty)
         new InetSocketAddress(port)
@@ -353,18 +349,32 @@ private[kafka] class Acceptor(val host: String,
 
 /**
  * Thread that processes all requests from a single connection. There are N of these running
in parallel
- * each of which has its own selectors
+ * each of which has its own selector
  */
 private[kafka] class Processor(val id: Int,
-                               val time: Time,
-                               val maxRequestSize: Int,
-                               val totalProcessorThreads: Int,
-                               val requestChannel: RequestChannel,
+                               time: Time,
+                               maxRequestSize: Int,
+                               requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
-                               val connectionsMaxIdleMs: Long,
-                               val protocol: SecurityProtocol,
-                               val channelConfigs: java.util.Map[String, Object],
-                               val metrics: Metrics) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+                               connectionsMaxIdleMs: Long,
+                               protocol: SecurityProtocol,
+                               channelConfigs: java.util.Map[String, Object],
+                               metrics: Metrics) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
+
+  private object ConnectionId {
+    def fromString(s: String): Option[ConnectionId] = s.split("-") match {
+      case Array(local, remote) => BrokerEndPoint.parseHostPort(local).flatMap { case
(localHost, localPort) =>
+        BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
+          ConnectionId(localHost, localPort, remoteHost, remotePort)
+        }
+      }
+      case _ => None
+    }
+  }
+
+  private case class ConnectionId(localHost: String, localPort: Int, remoteHost: String,
remotePort: Int) {
+    override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort"
+  }
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
@@ -379,10 +389,10 @@ private[kafka] class Processor(val id: Int,
         metrics.metrics().get(new MetricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
       }
     },
-    JavaConversions.mapAsScalaMap(metricTags)
+    metricTags.asScala
   )
 
-  private val selector = new org.apache.kafka.common.network.Selector(
+  private val selector = new KSelector(
     maxRequestSize,
     connectionsMaxIdleMs,
     metrics,
@@ -404,38 +414,46 @@ private[kafka] class Processor(val id: Int,
         try {
           selector.poll(300)
         } catch {
-          case e @ (_: IllegalStateException | _: IOException) => {
+          case e @ (_: IllegalStateException | _: IOException) =>
             error("Closing processor %s due to illegal state or IO exception".format(id))
             swallow(closeAll())
             shutdownComplete()
             throw e
-          }
           case e: InvalidReceiveException =>
             // Log warning and continue since Selector already closed the connection
             warn("Connection was closed due to invalid receive. Processor will continue handling
other connections")
         }
-        collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive
=> {
+        selector.completedReceives.asScala.foreach { receive =>
           try {
-
-            val channel = selector.channelForId(receive.source);
+            val channel = selector.channel(receive.source)
             val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
channel.principal().getName), channel.socketDescription)
             val req = RequestChannel.Request(processor = id, connectionId = receive.source,
session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol
= protocol)
             requestChannel.sendRequest(req)
           } catch {
-            case e @ (_: InvalidRequestException | _: SchemaException) => {
+            case e @ (_: InvalidRequestException | _: SchemaException) =>
               // note that even though we got an exception, we can assume that receive.source
is valid. Issues with constructing a valid receive object were handled earlier
               error("Closing socket for " + receive.source + " because of error", e)
-              selector.close(receive.source)
-            }
+              close(selector, receive.source)
           }
           selector.mute(receive.source)
-        })
+        }
 
-        collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send
=> {
-          val resp = inflightResponses.remove(send.destination()).get
+        selector.completedSends.asScala.foreach { send =>
+          val resp = inflightResponses.remove(send.destination()).getOrElse {
+            throw new IllegalStateException(s"Send for ${send.destination} completed, but
not in `inflightResponses`")
+          }
           resp.request.updateRequestMetrics()
           selector.unmute(send.destination())
-        })
+        }
+
+        selector.disconnected.asScala.foreach { connectionId =>
+          val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
+            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
+          }.remoteHost
+          // the channel has been closed by the selector but the quotas still need to be
updated
+          connectionQuotas.dec(InetAddress.getByName(remoteHost))
+        }
+
       } catch {
         // We catch all the throwables here to prevent the processor thread from exiting.
We do this because
         // letting a processor exit might cause bigger impact on the broker. Usually the
exceptions thrown would
@@ -457,23 +475,20 @@ private[kafka] class Processor(val id: Int,
     while(curr != null) {
       try {
         curr.responseAction match {
-          case RequestChannel.NoOpAction => {
+          case RequestChannel.NoOpAction =>
             // There is no response to send to the client, we need to read more pipelined
requests
             // that are sitting in the server's socket buffer
             curr.request.updateRequestMetrics
             trace("Socket server received empty response to send, registering for read: "
+ curr)
             selector.unmute(curr.request.connectionId)
-          }
-          case RequestChannel.SendAction => {
+          case RequestChannel.SendAction =>
             trace("Socket server received response to send, registering for write and sending
data: " + curr)
             selector.send(curr.responseSend)
             inflightResponses += (curr.request.connectionId -> curr)
-          }
-          case RequestChannel.CloseConnectionAction => {
+          case RequestChannel.CloseConnectionAction =>
             curr.request.updateRequestMetrics
             trace("Closing socket connection actively according to the response code.")
-            selector.close(curr.request.connectionId)
-          }
+            close(selector, curr.request.connectionId)
         }
       } finally {
         curr = requestChannel.receiveResponse(id)
@@ -501,7 +516,7 @@ private[kafka] class Processor(val id: Int,
         val localPort = channel.socket().getLocalPort
         val remoteHost = channel.socket().getInetAddress.getHostAddress
         val remotePort = channel.socket().getPort
-        val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
+        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
         selector.register(connectionId, channel)
       } catch {
         // We explicitly catch all non fatal exceptions and close the socket to avoid socket
leak. The other
@@ -515,12 +530,16 @@ private[kafka] class Processor(val id: Int,
   }
 
   /**
-   * Close all open connections
+   * Close the selector and all open connections
    */
-  def closeAll() {
+  private def closeAll() {
+    selector.channels.asScala.foreach { channel =>
+      close(selector, channel.id)
+    }
     selector.close()
   }
 
+
   /**
    * Wakeup the thread for selection.
    */
@@ -530,29 +549,35 @@ private[kafka] class Processor(val id: Int,
 }
 
 class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
-  private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1),
entry._2))
+
+  private val overrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host),
count) }
   private val counts = mutable.Map[InetAddress, Int]()
 
-  def inc(addr: InetAddress) {
-    counts synchronized {
-      val count = counts.getOrElse(addr, 0)
-      counts.put(addr, count + 1)
-      val max = overrides.getOrElse(addr, defaultMax)
-      if(count >= max)
-        throw new TooManyConnectionsException(addr, max)
+  def inc(address: InetAddress) {
+    counts.synchronized {
+      val count = counts.getOrElseUpdate(address, 0)
+      counts.put(address, count + 1)
+      val max = overrides.getOrElse(address, defaultMax)
+      if (count >= max)
+        throw new TooManyConnectionsException(address, max)
     }
   }
 
-  def dec(addr: InetAddress) {
-    counts synchronized {
-      val count = counts.get(addr).get
-      if(count == 1)
-        counts.remove(addr)
+  def dec(address: InetAddress) {
+    counts.synchronized {
+      val count = counts.getOrElse(address,
+        throw new IllegalArgumentException(s"Attempted to decrease connection count for address
with no connections, address: $address"))
+      if (count == 1)
+        counts.remove(address)
       else
-        counts.put(addr, count - 1)
+        counts.put(address, count - 1)
     }
   }
 
+  def get(address: InetAddress): Int = counts.synchronized {
+    counts.getOrElse(address, 0)
+  }
+
 }
 
 class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too
many connections from %s (maximum = %d)".format(ip, count))

http://git-wip-us.apache.org/repos/asf/kafka/blob/1265d7cb/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 533538d..6f07a7a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -51,9 +51,9 @@ class SocketServerTest extends JUnitSuite {
   props.put("socket.request.max.bytes", "50")
   props.put("max.connections.per.ip", "5")
   props.put("connections.max.idle.ms", "60000")
-  val config: KafkaConfig = KafkaConfig.fromProps(props)
-  val metrics = new Metrics()
-  val server: SocketServer = new SocketServer(config, metrics, new SystemTime())
+  val config = KafkaConfig.fromProps(props)
+  val metrics = new Metrics
+  val server = new SocketServer(config, metrics, new SystemTime)
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -82,9 +82,8 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
-  def connect(s:SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT)
= {
+  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT)
=
     new Socket("localhost", server.boundPort(protocol))
-  }
 
   @After
   def cleanup() {
@@ -92,10 +91,7 @@ class SocketServerTest extends JUnitSuite {
     server.shutdown()
   }
 
-  @Test
-  def simpleRequest() {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
-    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
+  private def producerRequestBytes: Array[Byte] = {
     val correlationId = -1
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
@@ -108,6 +104,14 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
     val serializedBytes = new Array[Byte](byteBuffer.remaining)
     byteBuffer.get(serializedBytes)
+    serializedBytes
+  }
+
+  @Test
+  def simpleRequest() {
+    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
+    val serializedBytes = producerRequestBytes
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, 0, serializedBytes)
@@ -122,7 +126,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def tooBigRequestIsRejected() {
-    val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
+    val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1)
     new Random().nextBytes(tooManyBytes)
     val socket = connect()
     sendRequest(socket, 0, tooManyBytes)
@@ -170,20 +174,34 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testMaxConnectionsPerIp() {
     // make the maximum allowable number of connections and then leak them
-    val conns = (0 until server.maxConnectionsPerIp).map(i => connect())
+    val conns = (0 until server.config.maxConnectionsPerIp).map(_ => connect())
     // now try one more (should fail)
     val conn = connect()
     conn.setSoTimeout(3000)
     assertEquals(-1, conn.getInputStream().read())
+    conn.close()
+
+    // it should succeed after closing one connection
+    val address = conns.head.getInetAddress
+    conns.head.close()
+    TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length,
+      "Failed to decrement connection count after close")
+    val conn2 = connect()
+    val serializedBytes = producerRequestBytes
+    sendRequest(conn2, 0, serializedBytes)
+    val request = server.requestChannel.receiveRequest(2000)
+    assertNotNull(request)
+    conn2.close()
+    conns.tail.foreach(_.close())
   }
 
   @Test
   def testMaxConnectionsPerIPOverrides() {
     val overrideNum = 6
-    val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
-    val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    val overrides = Map("localhost" -> overrideNum)
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     val serverMetrics = new Metrics()
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops),
serverMetrics, new SystemTime())
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps),
serverMetrics, new SystemTime())
     try {
       overrideServer.startup()
       // make the maximum allowable number of connections and then leak them
@@ -192,6 +210,8 @@ class SocketServerTest extends JUnitSuite {
       val conn = connect(overrideServer)
       conn.setSoTimeout(3000)
       assertEquals(-1, conn.getInputStream.read())
+      conn.close()
+      conns.foreach(_.close())
     } finally {
       overrideServer.shutdown()
       serverMetrics.close()
@@ -201,11 +221,11 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSSLSocketServer(): Unit = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
-    val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0,
enableSSL = true, trustStoreFile = Some(trustStoreFile))
-    overrideprops.put("listeners", "SSL://localhost:0")
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0,
enableSSL = true, trustStoreFile = Some(trustStoreFile))
+    overrideProps.put("listeners", "SSL://localhost:0")
 
-    val serverMetrics = new Metrics()
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops),
serverMetrics, new SystemTime())
+    val serverMetrics = new Metrics
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps),
serverMetrics, new SystemTime)
     overrideServer.startup()
     try {
       val sslContext = SSLContext.getInstance("TLSv1.2")
@@ -230,6 +250,7 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(sslSocket, 0, serializedBytes)
       processRequest(overrideServer.requestChannel)
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
+      sslSocket.close()
     } finally {
       overrideServer.shutdown()
       serverMetrics.close()
@@ -242,5 +263,7 @@ class SocketServerTest extends JUnitSuite {
     val bytes = new Array[Byte](40)
     sendRequest(socket, 0, bytes)
     assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal)
+    socket.close()
   }
+
 }


Mime
View raw message