kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2517; Performance Regression post SSL implementation (zero copy)
Date Fri, 02 Oct 2015 21:46:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5764e54de -> 7cfdf19b5


KAFKA-2517; Performance Regression post SSL implementation (zero copy)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Sriharsha Chintalapani <schintalapani@hortonworks.com>, Ben Stopford <benstopford@gmail.com>,
Jun Rao <junrao@gmail.com>

Closes #273 from ijuma/kafka-2517-ssl-zero-copy-regression


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7cfdf19b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7cfdf19b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7cfdf19b

Branch: refs/heads/trunk
Commit: 7cfdf19b54ab60b9644dcbecccb9130d6621da57
Parents: 5764e54
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Oct 2 14:46:07 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Oct 2 14:46:07 2015 -0700

----------------------------------------------------------------------
 .../common/network/PlaintextTransportLayer.java | 10 +++++--
 .../kafka/common/network/SSLTransportLayer.java | 29 ++++++++++++--------
 .../kafka/common/network/TransportLayer.java    | 18 ++++++++++++
 .../main/scala/kafka/log/FileMessageSet.scala   |  8 +++++-
 4 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7cfdf19b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 9bfa3a1..1149c99 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -23,6 +23,7 @@ package org.apache.kafka.common.network;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.SelectionKey;
 
@@ -192,7 +193,7 @@ public class PlaintextTransportLayer implements TransportLayer {
 
     /**
      * Adds the interestOps to selectionKey.
-     * @param interestOps
+     * @param ops
      */
     @Override
     public void addInterestOps(int ops) {
@@ -202,7 +203,7 @@ public class PlaintextTransportLayer implements TransportLayer {
 
     /**
      * Removes the interestOps from selectionKey.
-     * @param interestOps
+     * @param ops
      */
     @Override
     public void removeInterestOps(int ops) {
@@ -213,4 +214,9 @@ public class PlaintextTransportLayer implements TransportLayer {
     public boolean isMute() {
         return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) ==
0;
     }
+
+    @Override
+    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException
{
+        return fileChannel.transferTo(position, count, socketChannel);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7cfdf19b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
index f1cd607..8b4bd9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.network;
 import java.io.IOException;
 import java.io.EOFException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.CancelledKeyException;
@@ -182,7 +183,7 @@ public class SSLTransportLayer implements TransportLayer {
     * Performs SSL handshake, non blocking.
     * Before application data (kafka protocols) can be sent client & kafka broker must
     * perform ssl handshake.
-    * During the handshake SSLEngine generates encrypted data  that will be transported over
socketChannel.
+    * During the handshake SSLEngine generates encrypted data that will be transported over
socketChannel.
     * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus
field is used to
     * determine what operation needs to occur to move handshake along.
     * A typical handshake might look like this.
@@ -237,7 +238,7 @@ public class SSLTransportLayer implements TransportLayer {
                               channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(),
netWriteBuffer.position());
                     //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer
contents
                     //we will break here otherwise we can do need_unwrap in the same call.
-                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP ||  !flush(netWriteBuffer))
{
+                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer))
{
                         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                         break;
                     }
@@ -311,7 +312,7 @@ public class SSLTransportLayer implements TransportLayer {
      * Sets the interestOps for the selectionKey.
      */
     private void handshakeFinished() throws IOException {
-        // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status
properly.
+        // SSLEngine.getHandshakeStatus is transient and it doesn't record FINISHED status
properly.
         // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
         // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake
finished or not
         if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
@@ -336,7 +337,7 @@ public class SSLTransportLayer implements TransportLayer {
     * @return SSLEngineResult
     * @throws IOException
     */
-    private SSLEngineResult  handshakeWrap(Boolean doWrite) throws IOException {
+    private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException {
         log.trace("SSLHandshake handshakeWrap", channelId);
         if (netWriteBuffer.hasRemaining())
             throw new IllegalStateException("handshakeWrap called with netWriteBuffer not
empty");
@@ -362,7 +363,7 @@ public class SSLTransportLayer implements TransportLayer {
     * @return SSLEngineResult
     * @throws IOException
     */
-    private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException {
+    private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
         log.trace("SSLHandshake handshakeUnwrap", channelId);
         SSLEngineResult result;
         boolean cont = false;
@@ -438,7 +439,7 @@ public class SSLTransportLayer implements TransportLayer {
                     }
 
                     // appReadBuffer will extended upto currentApplicationBufferSize
-                    // we need to read the existing content into dst before we can do unwrap
again.  If there are no space in dst
+                    // we need to read the existing content into dst before we can do unwrap
again. If there are no space in dst
                     // we can break here.
                     if (dst.hasRemaining())
                         read += readFromAppBuffer(dst);
@@ -479,7 +480,7 @@ public class SSLTransportLayer implements TransportLayer {
      * @param dsts - The buffers into which bytes are to be transferred
      * @param offset - The offset within the buffer array of the first buffer into which
bytes are to be transferred; must be non-negative and no larger than dsts.length.
      * @param length - The maximum number of buffers to be accessed; must be non-negative
and no larger than dsts.length - offset
-     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached
end-of-stream.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached
end-of-stream.
      * @throws IOException if some other I/O error occurs
      */
     @Override
@@ -509,7 +510,7 @@ public class SSLTransportLayer implements TransportLayer {
     * Writes a sequence of bytes to this channel from the given buffer.
     *
     * @param src The buffer from which bytes are to be retrieved
-    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached
end-of-stream
+    * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
     * @throws IOException If some other I/O error occurs
     */
     @Override
@@ -557,7 +558,7 @@ public class SSLTransportLayer implements TransportLayer {
     * @throws IOException If some other I/O error occurs
     */
     @Override
-    public long write(ByteBuffer[] srcs, int offset, int length)  throws IOException {
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
             throw new IndexOutOfBoundsException();
         int totalWritten = 0;
@@ -681,12 +682,18 @@ public class SSLTransportLayer implements TransportLayer {
         try {
             sslEngine.closeInbound();
         } catch (SSLException e) {
-            log.debug("SSLEngine.closeInBound() raised an exception.",  e);
+            log.debug("SSLEngine.closeInBound() raised an exception.", e);
         }
     }
 
     @Override
     public boolean isMute() {
-        return  key.isValid() && (key.interestOps() & SelectionKey.OP_READ) ==
0;
+        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) ==
0;
     }
+
+    @Override
+    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException
{
+        return fileChannel.transferTo(position, count, this);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7cfdf19b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index e9158aa..ff7a3bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -25,6 +25,7 @@ package org.apache.kafka.common.network;
  * a network I/O channel.
  */
 import java.io.IOException;
+import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.channels.GatheringByteChannel;
@@ -83,4 +84,21 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     void removeInterestOps(int ops);
 
     boolean isMute();
+
+    /**
+     * Transfers bytes from `fileChannel` to this `TransportLayer`.
+     *
+     * This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
+     * but it will unwrap the destination channel, if possible, in order to benefit from
zero copy. This is required
+     * because the fast path of `transferTo` is only executed if the destination buffer inherits
from an internal JDK
+     * class.
+     *
+     * @param fileChannel The source channel
+     * @param position The position within the file at which the transfer is to begin; must
be non-negative
+     * @param count The maximum number of bytes to be transferred; must be non-negative
+     * @return The number of bytes, possibly zero, that were actually transferred
+     * @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
+     */
+    long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7cfdf19b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 39361fe..949dc02 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -27,6 +27,7 @@ import kafka.message._
 import kafka.common.KafkaException
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import org.apache.kafka.common.network.TransportLayer
 
 /**
  * An on-disk message set. An optional start and end position can be applied to the message
set
@@ -157,7 +158,12 @@ class FileMessageSet private[kafka](@volatile var file: File,
       throw new KafkaException("Size of FileMessageSet %s has been truncated during write:
old size %d, new size %d"
         .format(file.getAbsolutePath, _size.get(), newSize))
     }
-    val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes),
destChannel).toInt
+    val position = start + writePosition
+    val count = math.min(size, sizeInBytes)
+    val bytesTransferred = (destChannel match {
+      case tl: TransportLayer => tl.transferFrom(channel, position, count)
+      case dc => channel.transferTo(position, count, dc)
+    }).toInt
     trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
       + " bytes requested for transfer : " + math.min(size, sizeInBytes))
     bytesTransferred


Mime
View raw message