kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2618; Disable SSL renegotiation for 0.9.0.0
Date Wed, 21 Oct 2015 21:39:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 300565381 -> 361686d4a


KAFKA-2618; Disable SSL renegotiation for 0.9.0.0

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Sriharsha Chintalapani <schintalapani@hortonworks.com>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Jun Rao <junrao@gmail.com>

Closes #339 from ijuma/kafka-2618-disable-renegotiation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/361686d4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/361686d4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/361686d4

Branch: refs/heads/trunk
Commit: 361686d4a999298b6e5b63cacda72168172eb936
Parents: 3005653
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Oct 21 14:39:39 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 21 14:39:39 2015 -0700

----------------------------------------------------------------------
 .../kafka/common/network/SSLTransportLayer.java |  23 +++-
 .../kafka/common/network/SSLSelectorTest.java   | 110 ++++++++++++++-----
 .../common/network/SSLTransportLayerTest.java   |   2 +-
 .../kafka/common/network/SelectorTest.java      |   2 +-
 4 files changed, 99 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
index e7afa02..813f0b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.SSLSession;
 import javax.net.ssl.SSLPeerUnverifiedException;
 
@@ -49,6 +50,8 @@ public class SSLTransportLayer implements TransportLayer {
     private final SSLEngine sslEngine;
     private final SelectionKey key;
     private final SocketChannel socketChannel;
+    private final boolean enableRenegotiation;
+
     private HandshakeStatus handshakeStatus;
     private SSLEngineResult handshakeResult;
     private boolean handshakeComplete = false;
@@ -59,17 +62,19 @@ public class SSLTransportLayer implements TransportLayer {
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine
sslEngine) throws IOException {
-        SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine);
+        // Disable renegotiation by default until we have fixed the known issues with the
existing implementation
+        SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine,
false);
         transportLayer.startHandshake();
         return transportLayer;
     }
 
     // Prefer `create`, only use this in tests
-    SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException
{
+    SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation)
throws IOException {
         this.channelId = channelId;
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();
         this.sslEngine = sslEngine;
+        this.enableRenegotiation = enableRenegotiation;
     }
 
     /**
@@ -305,6 +310,12 @@ public class SSLTransportLayer implements TransportLayer {
         }
     }
 
+    private void renegotiate() throws IOException {
+        if (!enableRenegotiation)
+            throw new SSLHandshakeException("Renegotiation is not supported");
+        handshake();
+    }
+
 
     /**
      * Executes the SSLEngine tasks needed.
@@ -435,10 +446,10 @@ public class SSLTransportLayer implements TransportLayer {
                 SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
                 netReadBuffer.compact();
                 // handle ssl renegotiation.
-                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING)
{
+                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING
&& unwrapResult.getStatus() == Status.OK) {
                     log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer
pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
                               channelId, appReadBuffer.position(), netReadBuffer.position(),
netWriteBuffer.position());
-                    handshake();
+                    renegotiate();
                     break;
                 }
 
@@ -541,8 +552,8 @@ public class SSLTransportLayer implements TransportLayer {
         netWriteBuffer.flip();
 
         //handle ssl renegotiation
-        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
-            handshake();
+        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
wrapResult.getStatus() == Status.OK) {
+            renegotiate();
             return written;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
index 6475ff0..eee7531 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -13,8 +13,13 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.io.File;
 import java.io.IOException;
@@ -22,6 +27,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSSLUtils;
 import org.junit.After;
@@ -34,6 +40,7 @@ import org.junit.Test;
 public class SSLSelectorTest extends SelectorTest {
 
     private Metrics metrics;
+    private Map<String, Object> sslClientConfigs;
 
     @Before
     public void setup() throws Exception {
@@ -44,7 +51,7 @@ public class SSLSelectorTest extends SelectorTest {
         this.server = new EchoServer(sslServerConfigs);
         this.server.start();
         this.time = new MockTime();
-        Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false,
false, Mode.SERVER, trustStoreFile, "client");
+        sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile,
"client");
         sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
 
         this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
@@ -66,46 +73,89 @@ public class SSLSelectorTest extends SelectorTest {
      */
     @Test
     public void testRenegotiation() throws Exception {
-        int reqs = 500;
+        ChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
+            @Override
+            protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String
id, SelectionKey key) throws IOException {
+                SocketChannel socketChannel = (SocketChannel) key.channel();
+                SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
+                    sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
socketChannel.socket().getPort()),
+                    true);
+                transportLayer.startHandshake();
+                return transportLayer;
+            }
+        };
+        channelBuilder.configure(sslClientConfigs);
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String,
String>(), channelBuilder);
+        try {
+            int reqs = 500;
+            String node = "0";
+            // create connections
+            InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+            // send echo requests and receive responses
+            int requests = 0;
+            int responses = 0;
+            int renegotiates = 0;
+            while (!selector.isChannelReady(node)) {
+                selector.poll(1000L);
+            }
+            selector.send(createSend(node, node + "-" + 0));
+            requests++;
+
+            // loop until we complete all requests
+            while (responses < reqs) {
+                selector.poll(0L);
+                if (responses >= 100 && renegotiates == 0) {
+                    renegotiates++;
+                    server.renegotiate();
+                }
+                assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+                // handle any responses we may have gotten
+                for (NetworkReceive receive : selector.completedReceives()) {
+                    String[] pieces = asString(receive).split("-");
+                    assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+                    assertEquals("Check the source", receive.source(), pieces[0]);
+                    assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+                    assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
+                    responses++;
+                }
+
+                // prepare new sends for the next round
+                for (int i = 0; i < selector.completedSends().size() && requests
< reqs && selector.isChannelReady(node); i++, requests++) {
+                    selector.send(createSend(node, node + "-" + requests));
+                }
+            }
+        } finally {
+            selector.close();
+        }
+    }
+
+    @Test
+    public void testDisabledRenegotiation() throws Exception {
         String node = "0";
         // create connections
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
         // send echo requests and receive responses
-        int requests = 0;
-        int responses = 0;
-        int renegotiates = 0;
         while (!selector.isChannelReady(node)) {
             selector.poll(1000L);
         }
         selector.send(createSend(node, node + "-" + 0));
-        requests++;
-
-        // loop until we complete all requests
-        while (responses < reqs) {
-            selector.poll(0L);
-            if (responses >= 100 && renegotiates == 0) {
-                renegotiates++;
-                server.renegotiate();
-            }
-            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
-            // handle any responses we may have gotten
-            for (NetworkReceive receive : selector.completedReceives()) {
-                String[] pieces = asString(receive).split("-");
-                assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
-                assertEquals("Check the source", receive.source(), pieces[0]);
-                assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
-                assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
-                responses++;
-            }
-
-            // prepare new sends for the next round
-            for (int i = 0; i < selector.completedSends().size() && requests <
reqs && selector.isChannelReady(node); i++, requests++) {
-                selector.send(createSend(node, node + "-" + requests));
-            }
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node, node + "-" + 1));
+        long expiryTime = System.currentTimeMillis() + 2000;
+
+        List<String> disconnected = new ArrayList<>();
+        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime)
{
+            selector.poll(10);
+            disconnected.addAll(selector.disconnected());
         }
+        assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
index 987f4bb..ebb59b5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
@@ -493,7 +493,7 @@ public class SSLTransportLayerTest {
 
         public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,

                 Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws
IOException {
-            super(channelId, key, sslEngine);
+            super(channelId, key, sslEngine, false);
             this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
             this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize);
             this.appBufSize = new ResizeableBufferSize(appBufSize);

http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6aa60ce..683eeee 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -42,7 +42,7 @@ public class SelectorTest {
 
     protected EchoServer server;
     protected Time time;
-    protected Selectable selector;
+    protected Selector selector;
     protected ChannelBuilder channelBuilder;
     private Metrics metrics;
 


Mime
View raw message