kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [3/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes
Date Sat, 24 Oct 2015 16:45:06 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
new file mode 100644
index 0000000..e2d6b3b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -0,0 +1,733 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.CancelledKeyException;
+
+import java.security.Principal;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Transport layer for SSL communication
+ */
+public class SslTransportLayer implements TransportLayer {
+    private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class);
+    private final String channelId;
+    private final SSLEngine sslEngine;
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private final boolean enableRenegotiation;
+
+    private HandshakeStatus handshakeStatus;
+    private SSLEngineResult handshakeResult;
+    private boolean handshakeComplete = false;
+    private boolean closing = false;
+    private ByteBuffer netReadBuffer;
+    private ByteBuffer netWriteBuffer;
+    private ByteBuffer appReadBuffer;
+    private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+    public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+        // Disable renegotiation by default until we have fixed the known issues with the existing implementation
+        SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine, false);
+        transportLayer.startHandshake();
+        return transportLayer;
+    }
+
+    // Prefer `create`, only use this in tests
+    SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
+        this.channelId = channelId;
+        this.key = key;
+        this.socketChannel = (SocketChannel) key.channel();
+        this.sslEngine = sslEngine;
+        this.enableRenegotiation = enableRenegotiation;
+    }
+
+    /**
+     * starts sslEngine handshake process
+     */
+    protected void startHandshake() throws IOException {
+
+        this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
+        this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
+        this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
+        
+        //clear & set netRead & netWrite buffers
+        netWriteBuffer.position(0);
+        netWriteBuffer.limit(0);
+        netReadBuffer.position(0);
+        netReadBuffer.limit(0);
+        handshakeComplete = false;
+        closing = false;
+        //initiate handshake
+        sslEngine.beginHandshake();
+        handshakeStatus = sslEngine.getHandshakeStatus();
+    }
+
+    @Override
+    public boolean ready() {
+        return handshakeComplete;
+    }
+
+    /**
+     * does socketChannel.finishConnect()
+     */
+    @Override
+    public void finishConnect() throws IOException {
+        socketChannel.finishConnect();
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+    }
+
+    /**
+     * disconnects selectionKey.
+     */
+    @Override
+    public void disconnect() {
+        key.cancel();
+    }
+
+    @Override
+    public SocketChannel socketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socketChannel.isOpen();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socketChannel.isConnected();
+    }
+
+
+    /**
+    * Sends a SSL close message and closes socketChannel.
+    * @throws IOException if an I/O error occurs
+    * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+    */
+    @Override
+    public void close() throws IOException {
+        if (closing) return;
+        closing = true;
+        sslEngine.closeOutbound();
+        try {
+            if (!flush(netWriteBuffer)) {
+                throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
+            }
+            //prep the buffer for the close message
+            netWriteBuffer.clear();
+            //perform the close, since we called sslEngine.closeOutbound
+            SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
+            //we should be in a close state
+            if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+                throw new IOException("Invalid close state, will not send network data.");
+            }
+            netWriteBuffer.flip();
+            flush(netWriteBuffer);
+            socketChannel.socket().close();
+            socketChannel.close();
+        } catch (IOException ie) {
+            log.warn("Failed to send SSL Close message ", ie);
+        }
+        key.attach(null);
+        key.cancel();
+    }
+
+    /**
+     * returns true if there are any pending contents in netWriteBuffer
+     */
+    @Override
+    public boolean hasPendingWrites() {
+        return netWriteBuffer.hasRemaining();
+    }
+
+    /**
+    * Flushes the buffer to the network, non blocking
+    * @param buf ByteBuffer
+    * @return boolean true if the buffer has been emptied out, false otherwise
+    * @throws IOException
+    */
+    private boolean flush(ByteBuffer buf) throws IOException {
+        int remaining = buf.remaining();
+        if (remaining > 0) {
+            int written = socketChannel.write(buf);
+            return written >= remaining;
+        }
+        return true;
+    }
+
+    /**
+    * Performs SSL handshake, non blocking.
+    * Before application data (kafka protocols) can be sent client & kafka broker must
+    * perform ssl handshake.
+    * During the handshake SSLEngine generates encrypted data that will be transported over socketChannel.
+    * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
+    * determine what operation needs to occur to move handshake along.
+    * A typical handshake might look like this.
+    * +-------------+----------------------------------+-------------+
+    * |  client     |  SSL/TLS message                 | HSStatus    |
+    * +-------------+----------------------------------+-------------+
+    * | wrap()      | ClientHello                      | NEED_UNWRAP |
+    * | unwrap()    | ServerHello/Cert/ServerHelloDone | NEED_WRAP   |
+    * | wrap()      | ClientKeyExchange                | NEED_WRAP   |
+    * | wrap()      | ChangeCipherSpec                 | NEED_WRAP   |
+    * | wrap()      | Finished                         | NEED_UNWRAP |
+    * | unwrap()    | ChangeCipherSpec                 | NEED_UNWRAP |
+    * | unwrap()    | Finished                         | FINISHED    |
+    * +-------------+----------------------------------+-------------+
+    *
+    * @throws IOException
+    */
+    @Override
+    public void handshake() throws IOException {
+        boolean read = key.isReadable();
+        boolean write = key.isWritable();
+        handshakeComplete = false;
+        handshakeStatus = sslEngine.getHandshakeStatus();
+        if (!flush(netWriteBuffer)) {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            return;
+        }
+        try {
+            switch (handshakeStatus) {
+                case NEED_TASK:
+                    log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeStatus = runDelegatedTasks();
+                    break;
+                case NEED_WRAP:
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeResult = handshakeWrap(write);
+                    if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                        int currentNetWriteBufferSize = netWriteBufferSize();
+                        netWriteBuffer.compact();
+                        netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
+                        netWriteBuffer.flip();
+                        if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
+                            throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() +
+                                                            ") >= network buffer size (" + currentNetWriteBufferSize + ")");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException();
+                    }
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
+                    //we will break here otherwise we can do need_unwrap in the same call.
+                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
+                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        break;
+                    }
+                case NEED_UNWRAP:
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    do {
+                        handshakeResult = handshakeUnwrap(read);
+                        if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                            int currentAppBufferSize = applicationBufferSize();
+                            appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
+                            if (appReadBuffer.position() > currentAppBufferSize) {
+                                throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
+                                                                ") > packet buffer size (" + currentAppBufferSize + ")");
+                            }
+                        }
+                    } while (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW);
+                    if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        int currentNetReadBufferSize = netReadBufferSize();
+                        netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
+                        if (netReadBuffer.position() >= currentNetReadBufferSize) {
+                            throw new IllegalStateException("Buffer underflow when there is available data");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
+                    }
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+
+                    //if handshakeStatus completed than fall-through to finished status.
+                    //after handshake is finished there is no data left to read/write in socketChannel.
+                    //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
+                    if (handshakeStatus != HandshakeStatus.FINISHED) {
+                        if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
+                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
+                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                        }
+                        break;
+                    }
+                case FINISHED:
+                    handshakeFinished();
+                    break;
+                case NOT_HANDSHAKING:
+                    handshakeFinished();
+                    break;
+                default:
+                    throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
+            }
+
+        } catch (SSLException e) {
+            handshakeFailure();
+            throw e;
+        }
+    }
+
+    private void renegotiate() throws IOException {
+        if (!enableRenegotiation)
+            throw new SSLHandshakeException("Renegotiation is not supported");
+        handshake();
+    }
+
+
+    /**
+     * Executes the SSLEngine tasks needed.
+     * @return HandshakeStatus
+     */
+    private HandshakeStatus runDelegatedTasks() {
+        for (;;) {
+            Runnable task = delegatedTask();
+            if (task == null) {
+                break;
+            }
+            task.run();
+        }
+        return sslEngine.getHandshakeStatus();
+    }
+
+    /**
+     * Checks if the handshake status is finished
+     * Sets the interestOps for the selectionKey.
+     */
+    private void handshakeFinished() throws IOException {
+        // SSLEngine.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
+        // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
+        // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
+        if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+            //we are complete if we have delivered the last package
+            handshakeComplete = !netWriteBuffer.hasRemaining();
+            //remove OP_WRITE if we are complete, otherwise we still have data to write
+            if (!handshakeComplete)
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            else
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+            log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
+                      channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+        } else {
+            throw new IOException("NOT_HANDSHAKING during handshake");
+        }
+    }
+
+    /**
+    * Performs the WRAP function
+    * @param doWrite boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException {
+        log.trace("SSLHandshake handshakeWrap {}", channelId);
+        if (netWriteBuffer.hasRemaining())
+            throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
+        //this should never be called with a network buffer that contains data
+        //so we can clear it here.
+        netWriteBuffer.clear();
+        SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
+        //prepare the results to be written
+        netWriteBuffer.flip();
+        handshakeStatus = result.getHandshakeStatus();
+        if (result.getStatus() == SSLEngineResult.Status.OK &&
+            result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+            handshakeStatus = runDelegatedTasks();
+        }
+
+        if (doWrite) flush(netWriteBuffer);
+        return result;
+    }
+
+    /**
+    * Perform handshake unwrap
+    * @param doRead boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
+        log.trace("SSLHandshake handshakeUnwrap {}", channelId);
+        SSLEngineResult result;
+        boolean cont = false;
+        int read = 0;
+        if (doRead)  {
+            read = socketChannel.read(netReadBuffer);
+            if (read == -1) throw new EOFException("EOF during handshake.");
+        }
+        do {
+            //prepare the buffer with the incoming data
+            netReadBuffer.flip();
+            result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+            netReadBuffer.compact();
+            handshakeStatus = result.getHandshakeStatus();
+            if (result.getStatus() == SSLEngineResult.Status.OK &&
+                result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                handshakeStatus = runDelegatedTasks();
+            }
+            cont = result.getStatus() == SSLEngineResult.Status.OK &&
+                handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+            log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus, result.getStatus());
+        } while (netReadBuffer.position() != 0 && cont);
+
+        return result;
+    }
+
+
+    /**
+    * Reads a sequence of bytes from this channel into the given buffer.
+    *
+    * @param dst The buffer into which bytes are to be transferred
+    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+    * @throws IOException if some other I/O error occurs
+    */
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        if (closing) return -1;
+        int read = 0;
+        if (!handshakeComplete) return read;
+
+        //if we have unread decrypted data in appReadBuffer read that into dst buffer.
+        if (appReadBuffer.position() > 0) {
+            read = readFromAppBuffer(dst);
+        }
+
+        if (dst.remaining() > 0) {
+            netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
+            if (netReadBuffer.remaining() > 0) {
+                int netread = socketChannel.read(netReadBuffer);
+                if (netread == 0) return netread;
+                else if (netread < 0) throw new EOFException("EOF during read");
+            }
+            do {
+                netReadBuffer.flip();
+                SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+                netReadBuffer.compact();
+                // handle ssl renegotiation.
+                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getStatus() == Status.OK) {
+                    log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    renegotiate();
+                    break;
+                }
+
+                if (unwrapResult.getStatus() == Status.OK) {
+                    read += readFromAppBuffer(dst);
+                } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                    int currentApplicationBufferSize = applicationBufferSize();
+                    appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
+                    if (appReadBuffer.position() >= currentApplicationBufferSize) {
+                        throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
+                                                        ") >= application buffer size (" + currentApplicationBufferSize + ")");
+                    }
+
+                    // appReadBuffer will extended upto currentApplicationBufferSize
+                    // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
+                    // we can break here.
+                    if (dst.hasRemaining())
+                        read += readFromAppBuffer(dst);
+                    else
+                        break;
+                } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                    int currentNetReadBufferSize = netReadBufferSize();
+                    netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
+                    if (netReadBuffer.position() >= currentNetReadBufferSize) {
+                        throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
+                                                        ") > packet buffer size (" + currentNetReadBufferSize + ")");
+                    }
+                    break;
+                } else if (unwrapResult.getStatus() == Status.CLOSED) {
+                    throw new EOFException();
+                }
+            } while (netReadBuffer.position() != 0);
+        }
+        return read;
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffers.
+     *
+     * @param dsts - The buffers into which bytes are to be transferred.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts) throws IOException {
+        return read(dsts, 0, dsts.length);
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+     * @param dsts - The buffers into which bytes are to be transferred
+     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+            throw new IndexOutOfBoundsException();
+
+        int totalRead = 0;
+        int i = offset;
+        while (i < length) {
+            if (dsts[i].hasRemaining()) {
+                int read = read(dsts[i]);
+                if (read > 0)
+                    totalRead += read;
+                else
+                    break;
+            }
+            if (!dsts[i].hasRemaining()) {
+                i++;
+            }
+        }
+        return totalRead;
+    }
+
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param src The buffer from which bytes are to be retrieved
+    * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        int written = 0;
+        if (closing) throw new IllegalStateException("Channel is in closing state");
+        if (!handshakeComplete) return written;
+
+        if (!flush(netWriteBuffer))
+            return written;
+
+        netWriteBuffer.clear();
+        SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
+        netWriteBuffer.flip();
+
+        //handle ssl renegotiation
+        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) {
+            renegotiate();
+            return written;
+        }
+
+        if (wrapResult.getStatus() == Status.OK) {
+            written = wrapResult.bytesConsumed();
+            flush(netWriteBuffer);
+        } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+            int currentNetWriteBufferSize = netWriteBufferSize();
+            netWriteBuffer.compact();
+            netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
+            netWriteBuffer.flip();
+            if (netWriteBuffer.limit() >= currentNetWriteBufferSize)
+                throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
+        } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+            throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
+        } else if (wrapResult.getStatus() == Status.CLOSED) {
+            throw new EOFException();
+        }
+        return written;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+    * @return returns no.of bytes written , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+            throw new IndexOutOfBoundsException();
+        int totalWritten = 0;
+        int i = offset;
+        while (i < length) {
+            if (srcs[i].hasRemaining() || hasPendingWrites()) {
+                int written = write(srcs[i]);
+                if (written > 0) {
+                    totalWritten += written;
+                }
+            }
+            if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
+                i++;
+            } else {
+                // if we are unable to write the current buffer to socketChannel we should break,
+                // as we might have reached max socket send buffer size.
+                break;
+            }
+        }
+        return totalWritten;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return write(srcs, 0, srcs.length);
+    }
+
+
+    /**
+     * SSLSession's peerPrincipal for the remote host.
+     * @return Principal
+     */
+    public Principal peerPrincipal() throws IOException {
+        try {
+            return sslEngine.getSession().getPeerPrincipal();
+        } catch (SSLPeerUnverifiedException se) {
+            log.warn("SSL peer is not authenticated, returning ANONYMOUS instead");
+            return KafkaPrincipal.ANONYMOUS;
+        }
+    }
+
+    /**
+     * returns a SSL Session after the handshake is established
+     * throws IllegalStateException if the handshake is not established
+     */
+    public SSLSession sslSession() throws IllegalStateException {
+        return sslEngine.getSession();
+    }
+
+    /**
+     * Adds interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void addInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() | ops);
+    }
+
+    /**
+     * removes interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void removeInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() & ~ops);
+    }
+
+
+    /**
+     * returns delegatedTask for the SSLEngine.
+     */
+    protected Runnable delegatedTask() {
+        return sslEngine.getDelegatedTask();
+    }
+
+    /**
+     * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
+     * @param dst ByteBuffer
+     */
+    private int readFromAppBuffer(ByteBuffer dst) {
+        appReadBuffer.flip();
+        int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
+        if (remaining > 0) {
+            int limit = appReadBuffer.limit();
+            appReadBuffer.limit(appReadBuffer.position() + remaining);
+            dst.put(appReadBuffer);
+            appReadBuffer.limit(limit);
+        }
+        appReadBuffer.compact();
+        return remaining;
+    }
+
+    protected int netReadBufferSize() {
+        return sslEngine.getSession().getPacketBufferSize();
+    }
+    
+    protected int netWriteBufferSize() {
+        return sslEngine.getSession().getPacketBufferSize();
+    }
+
+    protected int applicationBufferSize() {
+        return sslEngine.getSession().getApplicationBufferSize();
+    }
+    
+    protected ByteBuffer netReadBuffer() {
+        return netReadBuffer;
+    }
+
+    private void handshakeFailure() {
+        //Release all resources such as internal buffers that SSLEngine is managing
+        sslEngine.closeOutbound();
+        try {
+            sslEngine.closeInbound();
+        } catch (SSLException e) {
+            log.debug("SSLEngine.closeInBound() raised an exception.", e);
+        }
+    }
+
+    @Override
+    public boolean isMute() {
+        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+    }
+
+    @Override
+    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
+        return fileChannel.transferTo(position, count, this);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 591fb8d..7459774 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -74,7 +74,8 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     boolean hasPendingWrites();
 
     /**
-     * Returns `SSLSession.getPeerPrincipal` if SSLTransportLayer is used and `KakfaPrincipal.ANONYMOUS` otherwise.
+     * Returns `SSLSession.getPeerPrincipal()` if this is a SslTransportLayer and there is an authenticated peer,
+     * `KafkaPrincipal.ANONYMOUS` is returned otherwise.
      */
     Principal peerPrincipal() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
deleted file mode 100644
index c25993e..0000000
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.security.ssl;
-
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.network.Mode;
-
-import javax.net.ssl.*;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-
-import java.util.List;
-import java.util.Map;
-
-public class SSLFactory implements Configurable {
-
-    private String protocol;
-    private String provider;
-    private String kmfAlgorithm;
-    private String tmfAlgorithm;
-    private SecurityStore keystore = null;
-    private String keyPassword;
-    private SecurityStore truststore;
-    private String[] cipherSuites;
-    private String[] enabledProtocols;
-    private String endpointIdentification;
-    private SSLContext sslContext;
-    private boolean needClientAuth;
-    private boolean wantClientAuth;
-    private final Mode mode;
-
-    public SSLFactory(Mode mode) {
-        this.mode = mode;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) throws KafkaException {
-        this.protocol =  (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
-        this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
-
-
-        List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
-        if (cipherSuitesList != null)
-            this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
-
-        List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
-        if (enabledProtocolsList != null)
-            this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
-
-        String endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
-        if (endpointIdentification != null)
-            this.endpointIdentification = endpointIdentification;
-
-        String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
-        if (clientAuthConfig != null) {
-            if (clientAuthConfig.equals("required"))
-                this.needClientAuth = true;
-            else if (clientAuthConfig.equals("requested"))
-                this.wantClientAuth = true;
-        }
-
-        this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
-        this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
-
-        createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG),
-                       (String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
-                       (String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
-                       (String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG));
-
-        createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
-                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
-                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
-        try {
-            this.sslContext = createSSLContext();
-        } catch (Exception e) {
-            throw new KafkaException(e);
-        }
-    }
-
-
-    private SSLContext createSSLContext() throws GeneralSecurityException, IOException  {
-        SSLContext sslContext;
-        if (provider != null)
-            sslContext = SSLContext.getInstance(protocol, provider);
-        else
-            sslContext = SSLContext.getInstance(protocol);
-
-        KeyManager[] keyManagers = null;
-        if (keystore != null) {
-            String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
-            KeyStore ks = keystore.load();
-            String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
-            kmf.init(ks, keyPassword.toCharArray());
-            keyManagers = kmf.getKeyManagers();
-        }
-
-        String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
-        TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
-        KeyStore ts = truststore == null ? null : truststore.load();
-        tmf.init(ts);
-
-        sslContext.init(keyManagers, tmf.getTrustManagers(), null);
-        return sslContext;
-    }
-
-    public SSLEngine createSSLEngine(String peerHost, int peerPort) {
-        SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
-        if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
-        if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
-
-        if (mode == Mode.SERVER) {
-            sslEngine.setUseClientMode(false);
-            if (needClientAuth)
-                sslEngine.setNeedClientAuth(needClientAuth);
-            else
-                sslEngine.setWantClientAuth(wantClientAuth);
-        } else {
-            sslEngine.setUseClientMode(true);
-            SSLParameters sslParams = sslEngine.getSSLParameters();
-            sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
-            sslEngine.setSSLParameters(sslParams);
-        }
-        return sslEngine;
-    }
-
-    /**
-     * Returns a configured SSLContext.
-     * @return SSLContext.
-     */
-    public SSLContext sslContext() {
-        return sslContext;
-    }
-
-    private void createKeystore(String type, String path, String password, String keyPassword) {
-        if (path == null && password != null) {
-            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
-        } else if (path != null && password == null) {
-            throw new KafkaException("SSL key store is specified, but key store password is not specified.");
-        } else if (path != null && password != null) {
-            this.keystore = new SecurityStore(type, path, password);
-            this.keyPassword = keyPassword;
-        }
-    }
-
-    private void createTruststore(String type, String path, String password) {
-        if (path == null && password != null) {
-            throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
-        } else if (path != null && password == null) {
-            throw new KafkaException("SSL trust store is specified, but trust store password is not specified.");
-        } else if (path != null && password != null) {
-            this.truststore = new SecurityStore(type, path, password);
-        }
-    }
-
-    private class SecurityStore {
-        private final String type;
-        private final String path;
-        private final String password;
-
-        private SecurityStore(String type, String path, String password) {
-            this.type = type == null ? KeyStore.getDefaultType() : type;
-            this.path = path;
-            this.password = password;
-        }
-
-        private KeyStore load() throws GeneralSecurityException, IOException {
-            FileInputStream in = null;
-            try {
-                KeyStore ks = KeyStore.getInstance(type);
-                in = new FileInputStream(path);
-                ks.load(in, password.toCharArray());
-                return ks;
-            } finally {
-                if (in != null) in.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
new file mode 100644
index 0000000..0984ba0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.network.Mode;
+
+import javax.net.ssl.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+
+import java.util.List;
+import java.util.Map;
+
+public class SslFactory implements Configurable {
+
+    private String protocol;
+    private String provider;
+    private String kmfAlgorithm;
+    private String tmfAlgorithm;
+    private SecurityStore keystore = null;
+    private String keyPassword;
+    private SecurityStore truststore;
+    private String[] cipherSuites;
+    private String[] enabledProtocols;
+    private String endpointIdentification;
+    private SSLContext sslContext;
+    private boolean needClientAuth;
+    private boolean wantClientAuth;
+    private final Mode mode;
+
+    public SslFactory(Mode mode) {
+        this.mode = mode;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        this.protocol =  (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
+        this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
+
+
+        List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
+        if (cipherSuitesList != null)
+            this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
+
+        List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+        if (enabledProtocolsList != null)
+            this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
+
+        String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        if (endpointIdentification != null)
+            this.endpointIdentification = endpointIdentification;
+
+        String clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG);
+        if (clientAuthConfig != null) {
+            if (clientAuthConfig.equals("required"))
+                this.needClientAuth = true;
+            else if (clientAuthConfig.equals("requested"))
+                this.wantClientAuth = true;
+        }
+
+        this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
+        this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
+
+        createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+                       (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+                       (String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                       (String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+
+        createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+                         (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+                         (String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+        try {
+            this.sslContext = createSSLContext();
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+
+    private SSLContext createSSLContext() throws GeneralSecurityException, IOException  {
+        SSLContext sslContext;
+        if (provider != null)
+            sslContext = SSLContext.getInstance(protocol, provider);
+        else
+            sslContext = SSLContext.getInstance(protocol);
+
+        KeyManager[] keyManagers = null;
+        if (keystore != null) {
+            String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
+            KeyStore ks = keystore.load();
+            String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+            kmf.init(ks, keyPassword.toCharArray());
+            keyManagers = kmf.getKeyManagers();
+        }
+
+        String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
+        KeyStore ts = truststore == null ? null : truststore.load();
+        tmf.init(ts);
+
+        sslContext.init(keyManagers, tmf.getTrustManagers(), null);
+        return sslContext;
+    }
+
+    public SSLEngine createSslEngine(String peerHost, int peerPort) {
+        SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
+        if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
+        if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
+
+        if (mode == Mode.SERVER) {
+            sslEngine.setUseClientMode(false);
+            if (needClientAuth)
+                sslEngine.setNeedClientAuth(needClientAuth);
+            else
+                sslEngine.setWantClientAuth(wantClientAuth);
+        } else {
+            sslEngine.setUseClientMode(true);
+            SSLParameters sslParams = sslEngine.getSSLParameters();
+            sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
+            sslEngine.setSSLParameters(sslParams);
+        }
+        return sslEngine;
+    }
+
+    /**
+     * Returns a configured SSLContext.
+     * @return SSLContext.
+     */
+    public SSLContext sslContext() {
+        return sslContext;
+    }
+
+    private void createKeystore(String type, String path, String password, String keyPassword) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL key store is specified, but key store password is not specified.");
+        } else if (path != null && password != null) {
+            this.keystore = new SecurityStore(type, path, password);
+            this.keyPassword = keyPassword;
+        }
+    }
+
+    private void createTruststore(String type, String path, String password) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL trust store is specified, but trust store password is not specified.");
+        } else if (path != null && password != null) {
+            this.truststore = new SecurityStore(type, path, password);
+        }
+    }
+
+    private class SecurityStore {
+        private final String type;
+        private final String path;
+        private final String password;
+
+        private SecurityStore(String type, String path, String password) {
+            this.type = type == null ? KeyStore.getDefaultType() : type;
+            this.path = path;
+            this.password = password;
+        }
+
+        private KeyStore load() throws GeneralSecurityException, IOException {
+            FileInputStream in = null;
+            try {
+                KeyStore ks = KeyStore.getInstance(type);
+                in = new FileInputStream(path);
+                ks.load(in, password.toCharArray());
+                return ks;
+            } finally {
+                if (in != null) in.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index b044cf4..b96a5f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockSerializer;
@@ -59,7 +59,7 @@ public class KafkaProducerTest {
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
         configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
-        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         final int oldInitCount = MockSerializer.INIT_COUNT.get();
         final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index 4a6d304..9354bfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -13,7 +13,7 @@
 package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocket;
@@ -38,14 +38,14 @@ class EchoServer extends Thread {
     private final List<Thread> threads;
     private final List<Socket> sockets;
     private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT;
-    private SSLFactory sslFactory;
+    private SslFactory sslFactory;
     private final AtomicBoolean renegotiate = new AtomicBoolean();
 
     public EchoServer(Map<String, ?> configs) throws Exception {
         this.protocol =  configs.containsKey("security.protocol") ?
             SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
         if (protocol == SecurityProtocol.SSL) {
-            this.sslFactory = new SSLFactory(Mode.SERVER);
+            this.sslFactory = new SslFactory(Mode.SERVER);
             this.sslFactory.configure(configs);
             SSLContext sslContext = this.sslFactory.sslContext();
             this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
deleted file mode 100644
index eee7531..0000000
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.security.ssl.SSLFactory;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestSSLUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
- */
-public class SSLSelectorTest extends SelectorTest {
-
-    private Metrics metrics;
-    private Map<String, Object> sslClientConfigs;
-
-    @Before
-    public void setup() throws Exception {
-        File trustStoreFile = File.createTempFile("truststore", ".jks");
-
-        Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
-        sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
-        this.server = new EchoServer(sslServerConfigs);
-        this.server.start();
-        this.time = new MockTime();
-        sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client");
-        sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
-
-        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
-        this.channelBuilder.configure(sslClientConfigs);
-        this.metrics = new Metrics();
-        this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
-    }
-
-    @After
-    public void teardown() throws Exception {
-        this.selector.close();
-        this.server.close();
-        this.metrics.close();
-    }
-
-    /**
-     * Tests that SSL renegotiation initiated by the server are handled correctly by the client
-     * @throws Exception
-     */
-    @Test
-    public void testRenegotiation() throws Exception {
-        ChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
-            @Override
-            protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
-                SocketChannel socketChannel = (SocketChannel) key.channel();
-                SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
-                    sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()),
-                    true);
-                transportLayer.startHandshake();
-                return transportLayer;
-            }
-        };
-        channelBuilder.configure(sslClientConfigs);
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder);
-        try {
-            int reqs = 500;
-            String node = "0";
-            // create connections
-            InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-            // send echo requests and receive responses
-            int requests = 0;
-            int responses = 0;
-            int renegotiates = 0;
-            while (!selector.isChannelReady(node)) {
-                selector.poll(1000L);
-            }
-            selector.send(createSend(node, node + "-" + 0));
-            requests++;
-
-            // loop until we complete all requests
-            while (responses < reqs) {
-                selector.poll(0L);
-                if (responses >= 100 && renegotiates == 0) {
-                    renegotiates++;
-                    server.renegotiate();
-                }
-                assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
-                // handle any responses we may have gotten
-                for (NetworkReceive receive : selector.completedReceives()) {
-                    String[] pieces = asString(receive).split("-");
-                    assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
-                    assertEquals("Check the source", receive.source(), pieces[0]);
-                    assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
-                    assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
-                    responses++;
-                }
-
-                // prepare new sends for the next round
-                for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) {
-                    selector.send(createSend(node, node + "-" + requests));
-                }
-            }
-        } finally {
-            selector.close();
-        }
-    }
-
-    @Test
-    public void testDisabledRenegotiation() throws Exception {
-        String node = "0";
-        // create connections
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        // send echo requests and receive responses
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
-        selector.send(createSend(node, node + "-" + 0));
-        selector.poll(0L);
-        server.renegotiate();
-        selector.send(createSend(node, node + "-" + 1));
-        long expiryTime = System.currentTimeMillis() + 2000;
-
-        List<String> disconnected = new ArrayList<>();
-        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
-            selector.poll(10);
-            disconnected.addAll(selector.disconnected());
-        }
-        assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));
-
-    }
-
-    /**
-     * Connects and waits for handshake to complete. This is required since SSLTransportLayer
-     * implementation requires the channel to be ready before send is invoked (unlike plaintext
-     * where send can be invoked straight after connect)
-     */
-    protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
-        blockingConnect(node, serverAddr);
-    }
-
-}


Mime
View raw message