kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5595; Ensure client connection ids are not reused too quickly
Date Mon, 14 Aug 2017 16:16:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8265a4389 -> 21ea4b1d2


KAFKA-5595; Ensure client connection ids are not reused too quickly

When there are broker delays that cause a response to take longer
than `connections.max.idle.ms`, connections may be closed by the
broker (as well as by the client) before the response is processed.
If the port is reused, broker may send the outstanding response to
a new connection with the reused port. The new connection will end
up with correlation id mismatch, requiring process restart. This
is also a security exposure since clients receive response
intended for the wrong connection.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3530 from rajinisivaram/KAFKA-5595


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21ea4b1d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21ea4b1d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21ea4b1d

Branch: refs/heads/trunk
Commit: 21ea4b1d2a1b99aa804bd4461015abaf31673923
Parents: 8265a43
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Mon Aug 14 17:07:42 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Aug 14 17:07:50 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   |  15 +-
 .../main/scala/kafka/network/SocketServer.scala |  50 ++++---
 .../unit/kafka/network/SocketServerTest.scala   | 136 ++++++++++++++++++-
 3 files changed, 181 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/21ea4b1d/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 38226f9..b0f31a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -193,6 +193,8 @@ public class Selector implements Selectable, AutoCloseable {
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
throws IOException {
         if (this.channels.containsKey(id))
             throw new IllegalStateException("There is already a connection for id " + id);
+        if (this.closingChannels.containsKey(id))
+            throw new IllegalStateException("There is already a connection for id " + id
+ " that is still being closed");
 
         SocketChannel socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
@@ -239,9 +241,20 @@ public class Selector implements Selectable, AutoCloseable {
     /**
      * Register the nioSelector with an existing channel
      * Use this on server-side, when a connection is accepted by a different thread but processed
by the Selector
-     * Note that we are not checking if the connection id is valid - since the connection
already exists
+     * <p>
+     * If a connection already exists with the same connection id in `channels` or `closingChannels`,
+     * an exception is thrown. Connection ids must be chosen to avoid conflict when remote
ports are reused.
+     * Kafka brokers add an incrementing index to the connection id to avoid reuse in the
timing window
+     * where an existing connection may not yet have been closed by the broker when a new
connection with
+     * the same remote host:port is processed.
+     * </p>
      */
     public void register(String id, SocketChannel socketChannel) throws ClosedChannelException
{
+        if (this.channels.containsKey(id))
+            throw new IllegalStateException("There is already a connection for id " + id);
+        if (this.closingChannels.containsKey(id))
+            throw new IllegalStateException("There is already a connection for id " + id
+ " that is still being closed");
+
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
         KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
         key.attach(channel);

http://git-wip-us.apache.org/repos/asf/kafka/blob/21ea4b1d/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index f418fdd..842c6a0 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -399,17 +399,17 @@ private[kafka] class Processor(val id: Int,
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
-      case Array(local, remote) => BrokerEndPoint.parseHostPort(local).flatMap { case
(localHost, localPort) =>
+      case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap
{ case (localHost, localPort) =>
         BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
-          ConnectionId(localHost, localPort, remoteHost, remotePort)
+          ConnectionId(localHost, localPort, remoteHost, remotePort, Integer.parseInt(index))
         }
       }
       case _ => None
     }
   }
 
-  private case class ConnectionId(localHost: String, localPort: Int, remoteHost: String,
remotePort: Int) {
-    override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort"
+  private[network] case class ConnectionId(localHost: String, localPort: Int, remoteHost:
String, remotePort: Int, index: Int) {
+    override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index"
   }
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
@@ -442,6 +442,11 @@ private[kafka] class Processor(val id: Int,
     ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache),
     memoryPool)
 
+  // Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The
index is a
+  // non-negative incrementing value that ensures that even if remotePort is reused after
a connection is
+  // closed, connection ids are not reused while requests from the closed connection are
being processed.
+  private var nextConnectionIndex = 0
+
   override def run() {
     startupComplete()
     while (isRunning) {
@@ -502,13 +507,15 @@ private[kafka] class Processor(val id: Int,
   protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send)
{
     val connectionId = response.request.connectionId
     trace(s"Socket server received response to send to $connectionId, registering for write
and sending data: $response")
-    val channel = selector.channel(connectionId)
-    // `channel` can be null if the selector closed the connection because it was idle for
too long
-    if (channel == null) {
+    // `channel` can be None if the connection was closed remotely or if selector closed
it for being idle for too long
+    if (channel(connectionId).isEmpty) {
       warn(s"Attempting to send response via channel for which there is no open connection,
connection id $connectionId")
       response.request.updateRequestMetrics(0L)
     }
-    else {
+    // Invoke send for closingChannel as well so that the send is failed and the channel
closed properly and
+    // removed from the Selector after discarding any pending staged receives.
+    // `openOrClosingChannel` can be None if the selector closed the connection because it
was idle for too long
+    if (!openOrClosingChannel(connectionId).isEmpty) {
       selector.send(responseSend)
       inflightResponses += (connectionId -> response)
     }
@@ -558,9 +565,7 @@ private[kafka] class Processor(val id: Int,
   }
 
   private def updateRequestMetrics(request: RequestChannel.Request) {
-    val channel = selector.channel(request.connectionId)
-    val openOrClosingChannel = if (channel != null) channel else selector.closingChannel(request.connectionId)
-    val networkThreadTimeNanos = if (openOrClosingChannel != null) openOrClosingChannel.getAndResetNetworkThreadTimeNanos()
else 0L
+    val networkThreadTimeNanos = openOrClosingChannel(request.connectionId).fold(0L) (_.getAndResetNetworkThreadTimeNanos())
     request.updateRequestMetrics(networkThreadTimeNanos)
   }
 
@@ -591,12 +596,7 @@ private[kafka] class Processor(val id: Int,
       val channel = newConnections.poll()
       try {
         debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
-        val localHost = channel.socket().getLocalAddress.getHostAddress
-        val localPort = channel.socket().getLocalPort
-        val remoteHost = channel.socket().getInetAddress.getHostAddress
-        val remotePort = channel.socket().getPort
-        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
-        selector.register(connectionId, channel)
+        selector.register(connectionId(channel.socket), channel)
       } catch {
         // We explicitly catch all non fatal exceptions and close the socket to avoid a socket
leak. The other
         // throwables will be caught in processor and logged as uncaught exceptions.
@@ -619,6 +619,22 @@ private[kafka] class Processor(val id: Int,
     selector.close()
   }
 
+  // 'protected` to allow override for testing
+  protected[network] def connectionId(socket: Socket): String = {
+    val localHost = socket.getLocalAddress.getHostAddress
+    val localPort = socket.getLocalPort
+    val remoteHost = socket.getInetAddress.getHostAddress
+    val remotePort = socket.getPort
+    val connId = ConnectionId(localHost, localPort, remoteHost, remotePort, nextConnectionIndex).toString
+    nextConnectionIndex = if (nextConnectionIndex == Int.MaxValue) 0 else nextConnectionIndex
+ 1
+    connId
+  }
+
+  // Visible for testing
+  private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] =
{
+    Option(selector.channel(connectionId)).orElse(Option(selector.closingChannel(connectionId)))
+  }
+
   /* For test usage */
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/21ea4b1d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f9a5ac2..edb814d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,6 +20,7 @@ package kafka.network
 import java.io._
 import java.net._
 import java.nio.ByteBuffer
+import java.nio.channels.SocketChannel
 import java.util.{HashMap, Random}
 import javax.net.ssl._
 
@@ -31,12 +32,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
+import org.apache.kafka.common.network.{KafkaChannel, ListenerName, NetworkSend, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, MockTime}
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -185,6 +186,137 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
+  def testConnectionId() {
+    val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT))
+    val serializedBytes = producerRequestBytes
+
+    val requests = sockets.map{socket =>
+      sendRequest(socket, serializedBytes)
+      server.requestChannel.receiveRequest(2000)
+    }
+    requests.zipWithIndex.foreach { case (request, i) =>
+      val index = request.connectionId.split("-").last
+      assertEquals(i.toString, index)
+    }
+
+    sockets.foreach(_.close)
+  }
+
+  @Test
+  def testIdleConnection() {
+    val idleTimeMs = 60000
+    val time = new MockTime()
+    props.put(KafkaConfig.ConnectionsMaxIdleMsProp, idleTimeMs.toString)
+    val serverMetrics = new Metrics
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time,
credentialProvider)
+
+    def openChannel(request: RequestChannel.Request): Option[KafkaChannel] =
+      overrideServer.processor(request.processor).channel(request.connectionId)
+    def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
+      overrideServer.processor(request.processor).openOrClosingChannel(request.connectionId)
+
+    try {
+      overrideServer.startup()
+      val serializedBytes = producerRequestBytes
+
+      // Connection with no staged receives
+      val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      sendRequest(socket1, serializedBytes)
+      val request1 = overrideServer.requestChannel.receiveRequest(2000)
+      assertTrue("Channel not open", openChannel(request1).nonEmpty)
+      assertEquals(openChannel(request1), openOrClosingChannel(request1))
+
+      time.sleep(idleTimeMs + 1)
+      TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to
close idle channel")
+      assertTrue("Channel not removed", openChannel(request1).isEmpty)
+      processRequest(overrideServer.requestChannel, request1)
+
+      // Connection with staged receives
+      val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      sendRequest(socket2, serializedBytes)
+      sendRequest(socket2, serializedBytes)
+      val request2 = overrideServer.requestChannel.receiveRequest(2000)
+
+      time.sleep(idleTimeMs + 1)
+      TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle
channel")
+      assertTrue("Channel removed without processing staging receives", openOrClosingChannel(request2).nonEmpty)
+      processRequest(overrideServer.requestChannel, request2) // this triggers a failed send
since channel has been closed
+      TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to
remove channel with failed sends")
+      assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200))
+
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
+  }
+
+  @Test
+  def testConnectionIdReuse() {
+    val idleTimeMs = 60000
+    val time = new MockTime()
+    props.put(KafkaConfig.ConnectionsMaxIdleMsProp, idleTimeMs.toString)
+    val serverMetrics = new Metrics
+    val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time,
credentialProvider) {
+      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName:
ListenerName,
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor
= {
+        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider,
memoryPool) {
+          override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
+        }
+      }
+    }
+
+    def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
+    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+    def connectionCount = overrideServer.connectionCount(InetAddress.getByName("127.0.0.1"))
+
+    try {
+      overrideServer.startup()
+      val socket1 = connect(overrideServer)
+      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to create channel")
+      val channel1 = openChannel.getOrElse(throw new RuntimeException("Channel not found"))
+
+      // Create new connection with same id when `channel1` is still open and in Selector.channels
+      // Check that new connection is closed and openChannel still contains `channel1`
+      connect(overrideServer)
+      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")
+      assertSame(channel1, openChannel.getOrElse(throw new RuntimeException("Channel not
found")))
+
+      // Send a request to `channel1` and advance time beyond idle time so that `channel1`
is
+      // closed with staged receives and is in Selector.closingChannels
+      val serializedBytes = producerRequestBytes
+      (1 to 3).foreach(_ => sendRequest(socket1, serializedBytes))
+      val request = overrideServer.requestChannel.receiveRequest(2000)
+      time.sleep(idleTimeMs + 1)
+      TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed")
+      assertTrue("Channel removed without processing staging receives", openOrClosingChannel.nonEmpty)
+
+      // Create new connection with same id when when `channel1` is in Selector.closingChannels
+      // Check that new connection is closed and openOrClosingChannel still contains `channel1`
+      connect(overrideServer)
+      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")
+      assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel
not found")))
+
+      // Complete request with failed send so that `channel1` is removed from Selector.closingChannels
+      processRequest(overrideServer.requestChannel, request)
+      TestUtils.waitUntilTrue(() => connectionCount == 0, "Failed to remove channel with
failed send")
+      assertTrue("Channel not removed", openOrClosingChannel.isEmpty)
+
+      // Check that new connections can be created with the same id since `channel1` is no
longer in Selector
+      connect(overrideServer)
+      TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to open new channel")
+      val newChannel = openChannel.getOrElse(throw new RuntimeException("Channel not found"))
+      assertNotSame(channel1, newChannel)
+      newChannel.disconnect()
+
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
+  }
+
+  @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)


Mime
View raw message