kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5674; Reduce max.connections.per.ip minimum to 0 (#3610)
Date Mon, 02 Apr 2018 15:45:19 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1dc3027  KAFKA-5674; Reduce max.connections.per.ip minimum to 0 (#3610)
1dc3027 is described below

commit 1dc30272e1e684764563e3091a19a8ff8892f8eb
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Mon Apr 2 08:45:15 2018 -0700

    KAFKA-5674; Reduce max.connections.per.ip minimum to 0 (#3610)
    
    By allowing `max.connections.per.ip` to be 0, Kafka can support IP-based filtering using
`max.connections.per.ip.overrides`.
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../unit/kafka/network/SocketServerTest.scala      | 43 ++++++++++++++++++++--
 docs/upgrade.html                                  |  1 +
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index cf22305..5a1dca3 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -782,7 +782,7 @@ object KafkaConfig {
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH,
SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1),
HIGH, SocketRequestMaxBytesDoc)
-      .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(1), MEDIUM,
MaxConnectionsPerIpDoc)
+      .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM,
MaxConnectionsPerIpDoc)
       .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides,
MEDIUM, MaxConnectionsPerIpOverridesDoc)
       .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index a057e54..0dad3c7 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,8 +20,8 @@ package kafka.network
 import java.io._
 import java.net._
 import java.nio.ByteBuffer
+import java.util.{HashMap, Properties, Random}
 import java.nio.channels.SocketChannel
-import java.util.{HashMap, Random}
 import javax.net.ssl._
 
 import com.yammer.metrics.core.{Gauge, Meter}
@@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString)))
   }
 
-  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT)
= {
-    val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
+  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
localAddr: InetAddress = null) = {
+    val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)),
localAddr, 0)
     sockets += socket
     socket
   }
@@ -444,6 +444,43 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
+  def testZeroMaxConnectionsPerIp() {
+    val newProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    newProps.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
+    newProps.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "%s:%s".format("127.0.0.1",
"5"))
+    val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(), Time.SYSTEM,
credentialProvider)
+    try {
+      server.startup()
+      // make the maximum allowable number of connections
+      val conns = (0 until 5).map(_ => connect(server))
+      // now try one more (should fail)
+      val conn = connect(server)
+      conn.setSoTimeout(3000)
+      assertEquals(-1, conn.getInputStream.read())
+      conn.close()
+
+      // it should succeed after closing one connection
+      val address = conns.head.getInetAddress
+      conns.head.close()
+      TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length,
+        "Failed to decrement connection count after close")
+      val conn2 = connect(server)
+      val serializedBytes = producerRequestBytes()
+      sendRequest(conn2, serializedBytes)
+      val request = server.requestChannel.receiveRequest(2000)
+      assertNotNull(request)
+
+      // now try to connect from the external facing interface, which should fail
+      val conn3 = connect(s = server, localAddr = InetAddress.getLocalHost)
+      conn3.setSoTimeout(3000)
+      assertEquals(-1, conn3.getInputStream.read())
+      conn3.close()
+    } finally {
+      shutdownServerAndMetrics(server)
+    }
+  }
+
+  @Test
   def testMaxConnectionsPerIpOverrides() {
     val overrideNum = server.config.maxConnectionsPerIp + 1
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 324f8df..0c5f5fd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -67,6 +67,7 @@
 <h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in
1.2.0</a></h5>
 <ul>
     <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a>
increases the default offset retention time from 1 day to 7 days. This makes it less likely
to "lose" offsets in an application that commits infrequently. It also increases the active
set of offsets and therefore can increase memory usage on the broker. Note that the console
consumer currently enables offset commit by default and can be the source of a large number
of offsets which this change will now preserve for [...]
+    <li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a>
extends the lower interval of <code>max.connections.per.ip minimum</code> to zero
and therefore allows IP-based filtering of inbound connections.</li>
 </ul>
 
 <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol
Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message