kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject kafka git commit: KAFKA-1512 Fixes for limit the maximum number of connections per ip address patch by Jeff Holoman reviewed by Jay Krepps and Gwen Shapira
Date Wed, 07 Jan 2015 03:39:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 fec9f32da -> 367ee76c0


KAFKA-1512 Fixes for limit the maximum number of connections per ip address patch by Jeff
Holoman reviewed by Jay Krepps and Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/367ee76c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/367ee76c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/367ee76c

Branch: refs/heads/0.8.2
Commit: 367ee76c08ecac376198bf1dc05820589055641b
Parents: fec9f32
Author: Joe Stein <joe.stein@stealth.ly>
Authored: Tue Jan 6 22:39:20 2015 -0500
Committer: Joe Stein <joe.stein@stealth.ly>
Committed: Tue Jan 6 22:39:20 2015 -0500

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../unit/kafka/network/SocketServerTest.scala   | 40 +++++++++++++++-----
 3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/367ee76c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e451592..39b1651 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int,
                    val maxRequestSize: Int = Int.MaxValue,
                    val maxConnectionsPerIp: Int = Int.MaxValue,
                    val connectionsMaxIdleMs: Long,
-                   val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]())
extends Logging with KafkaMetricsGroup {
+                   val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with
KafkaMetricsGroup {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)

http://git-wip-us.apache.org/repos/asf/kafka/blob/367ee76c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1bf7d10..1691ad7 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                       config.socketReceiveBufferBytes,
                                       config.socketRequestMaxBytes,
                                       config.maxConnectionsPerIp,
-                                      config.connectionsMaxIdleMs)
+                                      config.connectionsMaxIdleMs,
+                                      config.maxConnectionsPerIpOverrides)
       socketServer.startup()
 
       replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager,
isShuttingDown)

http://git-wip-us.apache.org/repos/asf/kafka/blob/367ee76c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5f4d852..6c8a4f7 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition
 import kafka.message.ByteBufferMessageSet
 import java.nio.channels.SelectionKey
 import kafka.utils.TestUtils
+import scala.collection.Map
 
 class SocketServerTest extends JUnitSuite {
 
@@ -42,7 +43,8 @@ class SocketServerTest extends JUnitSuite {
                                               recvBufferSize = 300000,
                                               maxRequestSize = 50,
                                               maxConnectionsPerIp = 5,
-                                              connectionsMaxIdleMs = 60*1000)
+                                              connectionsMaxIdleMs = 60*1000,
+                                              maxConnectionsPerIpOverrides = Map.empty[String,Int])
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -71,13 +73,12 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
-  def connect() = new Socket("localhost", server.port)
+  def connect(s:SocketServer = server) = new Socket("localhost", s.port)
 
   @After
   def cleanup() {
     server.shutdown()
   }
-  
   @Test
   def simpleRequest() {
     val socket = connect()
@@ -141,19 +142,38 @@ class SocketServerTest extends JUnitSuite {
     // doing a subsequent send should throw an exception as the connection should be closed.
     sendRequest(socket, 0, bytes)
   }
-  
+
   @Test
   def testMaxConnectionsPerIp() {
     // make the maximum allowable number of connections and then leak them
     val conns = (0 until server.maxConnectionsPerIp).map(i => connect())
-    
     // now try one more (should fail)
-    try {
       val conn = connect()
-      sendRequest(conn, 100, "hello".getBytes)
+      conn.setSoTimeout(3000)
       assertEquals(-1, conn.getInputStream().read())
-    } catch {
-      case e: IOException => // this is good
-    }
+  }
+  @Test
+  def testMaxConnectionsPerIPOverrides(): Unit = {
+    val overrideNum = 6
+    val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
+    val overrideServer: SocketServer = new SocketServer(0,
+                                                host = null,
+                                                port = kafka.utils.TestUtils.choosePort,
+                                                numProcessorThreads = 1,
+                                                maxQueuedRequests = 50,
+                                                sendBufferSize = 300000,
+                                                recvBufferSize = 300000,
+                                                maxRequestSize = 50,
+                                                maxConnectionsPerIp = 5,
+                                                connectionsMaxIdleMs = 60*1000,
+                                                maxConnectionsPerIpOverrides = overrides)
+    overrideServer.startup()
+    // make the maximum allowable number of connections and then leak them
+    val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
+    // now try one more (should fail)
+    val conn = connect(overrideServer)
+    conn.setSoTimeout(3000)
+    assertEquals(-1, conn.getInputStream.read())
+    overrideServer.shutdown()
   }
 }


Mime
View raw message