kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1282 Disconnect idle socket connection in Selector; reviewed by Neha Narkhede and Jun Rao
Date Sat, 20 Sep 2014 20:51:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk be5edd2f8 -> 8eac7d789


KAFKA-1282 Disconnect idle socket connection in Selector; reviewed by Neha Narkhede and Jun
Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8eac7d78
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8eac7d78
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8eac7d78

Branch: refs/heads/trunk
Commit: 8eac7d789f588b48ad278f4db620402c426c65bc
Parents: be5edd2
Author: nicu marasoiu <nmarasoi@adobe.com>
Authored: Sat Sep 20 13:50:30 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Sat Sep 20 13:51:22 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 44 ++++++++++++++++++--
 .../main/scala/kafka/server/KafkaConfig.scala   |  3 ++
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../unit/kafka/network/SocketServerTest.scala   |  3 +-
 4 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8eac7d78/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d678990..3a6f8d1 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -46,6 +46,7 @@ class SocketServer(val brokerId: Int,
                    val recvBufferSize: Int,
                    val maxRequestSize: Int = Int.MaxValue,
                    val maxConnectionsPerIp: Int = Int.MaxValue,
+                   val connectionsMaxIdleMs: Long,
                    val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]())
extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
@@ -69,7 +70,8 @@ class SocketServer(val brokerId: Int,
                                     newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent",
TimeUnit.NANOSECONDS),
                                     numProcessorThreads, 
                                     requestChannel,
-                                    quotas)
+                                    quotas,
+                                    connectionsMaxIdleMs)
       Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
     }
 
@@ -296,9 +298,14 @@ private[kafka] class Processor(val id: Int,
                                val idleMeter: Meter,
                                val totalProcessorThreads: Int,
                                val requestChannel: RequestChannel,
-                               connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas)
{
-  
+                               connectionQuotas: ConnectionQuotas,
+                               val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas)
{
+
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
+  private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000
+  private var currentTimeNanos = SystemTime.nanoseconds
+  private val lruConnections = new util.LinkedHashMap[SelectionKey, Long]
+  private var nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos
 
   override def run() {
     startupComplete()
@@ -309,7 +316,8 @@ private[kafka] class Processor(val id: Int,
       processNewResponses()
       val startSelectTime = SystemTime.nanoseconds
       val ready = selector.select(300)
-      val idleTime = SystemTime.nanoseconds - startSelectTime
+      currentTimeNanos = SystemTime.nanoseconds
+      val idleTime = currentTimeNanos - startSelectTime
       idleMeter.mark(idleTime)
       // We use a single meter for aggregate idle percentage for the thread pool.
       // Since meter is calculated as total_recorded_value / time_window and
@@ -348,6 +356,7 @@ private[kafka] class Processor(val id: Int,
           }
         }
       }
+      maybeCloseOldestConnection
     }
     debug("Closing selector.")
     swallowError(closeAll())
@@ -355,6 +364,14 @@ private[kafka] class Processor(val id: Int,
     shutdownComplete()
   }
 
+  /**
+   * Close the given key and associated socket
+   */
+  override def close(key: SelectionKey): Unit = {
+    lruConnections.remove(key)
+    super.close(key)
+  }
+
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
     while(curr != null) {
@@ -415,6 +432,7 @@ private[kafka] class Processor(val id: Int,
    * Process reads from ready sockets
    */
   def read(key: SelectionKey) {
+    lruConnections.put(key, currentTimeNanos)
     val socketChannel = channelFor(key)
     var receive = key.attachment.asInstanceOf[Receive]
     if(key.attachment == null) {
@@ -465,6 +483,24 @@ private[kafka] class Processor(val id: Int,
 
   private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
 
+  private def maybeCloseOldestConnection {
+    if(currentTimeNanos > nextIdleCloseCheckTime) {
+      if(lruConnections.isEmpty) {
+        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos
+      } else {
+        val oldestConnectionEntry = lruConnections.entrySet.iterator().next()
+        val connectionLastActiveTime = oldestConnectionEntry.getValue
+        nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos
+        if(currentTimeNanos > nextIdleCloseCheckTime) {
+          val key: SelectionKey = oldestConnectionEntry.getKey
+          trace("About to close the idle connection from " + key.channel.asInstanceOf[SocketChannel].socket.getRemoteSocketAddress
+            + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) /
1000 / 1000 + " millis")
+          close(key)
+        }
+      }
+    }
+  }
+
 }
 
 class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8eac7d78/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dce48db..165c816 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -113,6 +113,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* per-ip or hostname overrides to the default maximum number of connections */
   val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry
=> (entry._1, entry._2.toInt))
 
+  /* idle connections timeout: the server socket processor threads close the connections
that idle more than this */
+  val connectionsMaxIdleMs = props.getLong("connections.max.idle.ms", 10*60*1000L)
+
   /*********** Log Configuration ***********/
 
   /* the default number of log partitions per topic */

http://git-wip-us.apache.org/repos/asf/kafka/blob/8eac7d78/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2871118..390fef5 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -92,7 +92,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                     config.socketSendBufferBytes,
                                     config.socketReceiveBufferBytes,
                                     config.socketRequestMaxBytes,
-                                    config.maxConnectionsPerIp)
+                                    config.maxConnectionsPerIp,
+                                    config.connectionsMaxIdleMs)
     socketServer.startup()
 
     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager,
isShuttingDown)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8eac7d78/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 3b83a86..5f4d852 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -41,7 +41,8 @@ class SocketServerTest extends JUnitSuite {
                                               sendBufferSize = 300000,
                                               recvBufferSize = 300000,
                                               maxRequestSize = 50,
-                                              maxConnectionsPerIp = 5)
+                                              maxConnectionsPerIp = 5,
+                                              connectionsMaxIdleMs = 60*1000)
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {


Mime
View raw message