kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao
Date Wed, 19 Aug 2015 04:53:36 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
new file mode 100644
index 0000000..f79b65c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import java.util.Map;
+import java.util.List;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+
+import javax.net.ssl.*;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.SSLConfigs;
+
+
+public class SSLFactory implements Configurable {
+
+    public enum Mode { CLIENT, SERVER };
+    private String protocol;
+    private String provider;
+    private String kmfAlgorithm;
+    private String tmfAlgorithm;
+    private SecurityStore keystore = null;
+    private String keyPassword;
+    private SecurityStore truststore;
+    private String[] cipherSuites;
+    private String[] enabledProtocols;
+    private String endpointIdentification;
+    private SSLContext sslContext;
+    private boolean needClientAuth;
+    private boolean wantClientAuth;
+    private final Mode mode;
+
+
+    public SSLFactory(Mode mode) {
+        this.mode = mode;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        this.protocol =  (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
+        this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
+
+        if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) {
+            List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
+            this.cipherSuites = (String[]) cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
+        }
+
+        if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) {
+            List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+            this.enabledProtocols =  (String[]) enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
+        }
+
+        if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) {
+            this.endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        }
+
+        if (configs.containsKey(SSLConfigs.SSL_CLIENT_AUTH_CONFIG)) {
+            String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
+            if (clientAuthConfig.equals("required"))
+                this.needClientAuth = true;
+            else if (clientAuthConfig.equals("requested"))
+                this.wantClientAuth = true;
+        }
+
+        this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
+        this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
+
+        if (checkKeyStoreConfigs(configs)) {
+            createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG));
+        }
+
+        createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+        try {
+            this.sslContext = createSSLContext();
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+
+    private SSLContext createSSLContext() throws GeneralSecurityException, IOException  {
+        SSLContext sslContext;
+        if (provider != null)
+            sslContext = SSLContext.getInstance(protocol, provider);
+        else
+            sslContext = SSLContext.getInstance(protocol);
+
+        KeyManager[] keyManagers = null;
+        if (keystore != null) {
+            String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
+            KeyStore ks = keystore.load();
+            String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+            kmf.init(ks, keyPassword.toCharArray());
+            keyManagers = kmf.getKeyManagers();
+        }
+
+        String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
+        KeyStore ts = truststore == null ? null : truststore.load();
+        tmf.init(ts);
+
+        sslContext.init(keyManagers, tmf.getTrustManagers(), null);
+        return sslContext;
+    }
+
+    public SSLEngine createSSLEngine(String peerHost, int peerPort) {
+        SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
+        if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
+        if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
+
+        if (mode == Mode.SERVER) {
+            sslEngine.setUseClientMode(false);
+            if (needClientAuth)
+                sslEngine.setNeedClientAuth(needClientAuth);
+            else
+                sslEngine.setWantClientAuth(wantClientAuth);
+        } else {
+            sslEngine.setUseClientMode(true);
+            SSLParameters sslParams = sslEngine.getSSLParameters();
+            sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
+            sslEngine.setSSLParameters(sslParams);
+        }
+        return sslEngine;
+    }
+
+    /**
+     * Returns a configured SSLContext.
+     * @return SSLContext.
+     */
+    public SSLContext sslContext() {
+        return sslContext;
+    }
+
+    private void createKeystore(String type, String path, String password, String keyPassword) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL key store password is not specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
+        } else if (path != null && password != null) {
+            this.keystore = new SecurityStore(type, path, password);
+            this.keyPassword = keyPassword;
+        }
+    }
+
+    private void createTruststore(String type, String path, String password) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL key store password is not specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
+        } else if (path != null && password != null) {
+            this.truststore = new SecurityStore(type, path, password);
+        }
+    }
+
+    private boolean checkKeyStoreConfigs(Map<String, ?> configs) {
+        return  configs.containsKey(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
+    }
+
+    private class SecurityStore {
+        private final String type;
+        private final String path;
+        private final String password;
+
+        private SecurityStore(String type, String path, String password) {
+            this.type = type == null ? KeyStore.getDefaultType() : type;
+            this.path = path;
+            this.password = password;
+        }
+
+        private KeyStore load() throws GeneralSecurityException, IOException {
+            FileInputStream in = null;
+            try {
+                KeyStore ks = KeyStore.getInstance(type);
+                in = new FileInputStream(path);
+                ks.load(in, password.toCharArray());
+                return ks;
+            } finally {
+                if (in != null) in.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 80a914e..c58b741 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -48,7 +48,7 @@ public class Utils {
 
     /**
      * Turn the given UTF8 byte array into a string
-     * 
+     *
      * @param bytes The byte array
      * @return The string
      */
@@ -62,7 +62,7 @@ public class Utils {
 
     /**
      * Turn a string into a utf8 byte[]
-     * 
+     *
      * @param string The string
      * @return The byte[]
      */
@@ -76,7 +76,7 @@ public class Utils {
 
     /**
      * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
-     * 
+     *
      * @param buffer The buffer to read from
      * @return The integer read, as a long to avoid signedness
      */
@@ -86,7 +86,7 @@ public class Utils {
 
     /**
      * Read an unsigned integer from the given position without modifying the buffers position
-     * 
+     *
      * @param buffer the buffer to read from
      * @param index the index from which to read the integer
      * @return The integer read, as a long to avoid signedness
@@ -97,12 +97,12 @@ public class Utils {
 
     /**
      * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
-     * 
+     *
      * @param in The stream to read from
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(InputStream in) throws IOException {
-        return (in.read() << 8 * 0) 
+        return (in.read() << 8 * 0)
              | (in.read() << 8 * 1)
              | (in.read() << 8 * 2)
              | (in.read() << 8 * 3);
@@ -111,7 +111,7 @@ public class Utils {
     /**
      * Read an unsigned integer stored in little-endian format from a byte array
      * at a given offset.
-     * 
+     *
      * @param buffer The byte array to read from
      * @param offset The position in buffer to read from
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
@@ -125,7 +125,7 @@ public class Utils {
 
     /**
      * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     * 
+     *
      * @param buffer The buffer to write to
      * @param value The value to write
      */
@@ -135,7 +135,7 @@ public class Utils {
 
     /**
      * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     * 
+     *
      * @param buffer The buffer to write to
      * @param index The position in the buffer at which to begin writing
      * @param value The value to write
@@ -146,7 +146,7 @@ public class Utils {
 
     /**
      * Write an unsigned integer in little-endian format to the {@link OutputStream}.
-     * 
+     *
      * @param out The stream to write to
      * @param value The value to write
      */
@@ -160,7 +160,7 @@ public class Utils {
     /**
      * Write an unsigned integer in little-endian format to a byte array
      * at a given offset.
-     * 
+     *
      * @param buffer The byte array to write to
      * @param offset The position in buffer to write to
      * @param value The value to write
@@ -198,7 +198,7 @@ public class Utils {
 
     /**
      * Get the length for UTF8-encoding a string without encoding it first
-     * 
+     *
      * @param s The string to calculate the length for
      * @return The length when serialized
      */
@@ -244,7 +244,7 @@ public class Utils {
 
     /**
      * Check that the parameter t is not null
-     * 
+     *
      * @param t The object to check
      * @return t if it isn't null
      * @throws NullPointerException if t is null.
@@ -381,7 +381,7 @@ public class Utils {
     public static <T> String join(T[] strs, String seperator) {
         return join(Arrays.asList(strs), seperator);
     }
-    
+
     /**
      * Create a string representation of a list joined by the given separator
      * @param list The list of items
@@ -394,7 +394,7 @@ public class Utils {
         while (iter.hasNext()) {
             sb.append(iter.next());
             if (iter.hasNext())
-                sb.append(seperator);  
+                sb.append(seperator);
         }
         return sb.toString();
     }
@@ -488,7 +488,7 @@ public class Utils {
 
     /**
      * Attempt to read a file as a string
-     * @throws IOException 
+     * @throws IOException
      */
     public static String readFileAsString(String path, Charset charset) throws IOException {
         if (charset == null) charset = Charset.defaultCharset();
@@ -507,4 +507,21 @@ public class Utils {
     public static String readFileAsString(String path) throws IOException {
         return Utils.readFileAsString(path, Charset.defaultCharset());
     }
+
+    /**
+     * Check if the given ByteBuffer capacity
+     * @param existingBuffer ByteBuffer capacity to check
+     * @param newLength new length for the ByteBuffer.
+     * returns ByteBuffer
+     */
+    public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength) {
+        if (newLength > existingBuffer.capacity()) {
+            ByteBuffer newBuffer = ByteBuffer.allocate(newLength);
+            existingBuffer.flip();
+            newBuffer.put(existingBuffer);
+            return newBuffer;
+        }
+        return existingBuffer;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 13ce519..d6a4019 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -39,4 +39,4 @@ public class ClientUtilsTest {
     private void check(String... url) {
         ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f3f8334..d1759ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,12 +18,16 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockSerializer;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
 
 public class KafkaProducerTest {
 
@@ -50,17 +54,18 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testSerializerClose() {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
+    public void testSerializerClose() throws Exception {
+        Map<String, Object> configs = new HashMap<String, Object>();
+        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
+        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         final int oldInitCount = MockSerializer.INIT_COUNT.get();
         final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
 
         KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
-                props, new MockSerializer(), new MockSerializer());
+                configs, new MockSerializer(), new MockSerializer());
         Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
         Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
new file mode 100644
index 0000000..f13c21a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * A simple server that takes size delimited byte arrays and just echos them back to the sender.
+ */
+class EchoServer extends Thread {
+    public final int port;
+    private final ServerSocket serverSocket;
+    private final List<Thread> threads;
+    private final List<Socket> sockets;
+    private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT;
+    private SSLFactory sslFactory;
+    private final AtomicBoolean renegotiate = new AtomicBoolean();
+
+    public EchoServer(Map<String, ?> configs) throws Exception {
+        this.protocol =  configs.containsKey("security.protocol") ?
+            SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
+        if (protocol == SecurityProtocol.SSL) {
+            this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+            this.sslFactory.configure(configs);
+            SSLContext sslContext = this.sslFactory.sslContext();
+            this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);
+        } else {
+            this.serverSocket = new ServerSocket(0);
+        }
+        this.port = this.serverSocket.getLocalPort();
+        this.threads = Collections.synchronizedList(new ArrayList<Thread>());
+        this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+    }
+
+    public void renegotiate() {
+        renegotiate.set(true);
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                final Socket socket = serverSocket.accept();
+                sockets.add(socket);
+                Thread thread = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            DataInputStream input = new DataInputStream(socket.getInputStream());
+                            DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+                            while (socket.isConnected() && !socket.isClosed()) {
+                                int size = input.readInt();
+                                if (renegotiate.get()) {
+                                    renegotiate.set(false);
+                                    ((SSLSocket) socket).startHandshake();
+                                }
+                                byte[] bytes = new byte[size];
+                                input.readFully(bytes);
+                                output.writeInt(size);
+                                output.write(bytes);
+                                output.flush();
+                            }
+                        } catch (IOException e) {
+                            // ignore
+                        } finally {
+                            try {
+                                socket.close();
+                            } catch (IOException e) {
+                                // ignore
+                            }
+                        }
+                    }
+                };
+                thread.start();
+                threads.add(thread);
+            }
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    public void closeConnections() throws IOException {
+        for (Socket socket : sockets)
+            socket.close();
+    }
+
+    public void close() throws IOException, InterruptedException {
+        this.serverSocket.close();
+        closeConnections();
+        for (Thread t : threads)
+            t.join();
+        join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
new file mode 100644
index 0000000..df1205c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSSLUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SSLSelectorTest {
+
+    private static final int BUFFER_SIZE = 4 * 1024;
+
+    private EchoServer server;
+    private Selector selector;
+    private ChannelBuilder channelBuilder;
+
+    @Before
+    public void setup() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+
+        Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
+        sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        this.server = new EchoServer(sslServerConfigs);
+        this.server.start();
+        Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, SSLFactory.Mode.SERVER, trustStoreFile, "client");
+        sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+
+        this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        this.selector.close();
+        this.server.close();
+    }
+
+
+    /**
+     * Validate that we can send and receive a message larger than the receive and send buffer size
+     */
+    @Test
+    public void testSendLargeRequest() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        String big = TestUtils.randomString(10 * BUFFER_SIZE);
+        assertEquals(big, blockingRequest(node, big));
+    }
+
+
+    /**
+     * Validate that when the server disconnects, a client send ends up with that node in the disconnected list.
+     */
+    @Test
+    public void testServerDisconnect() throws Exception {
+        String node = "0";
+        // connect and do a simple request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+
+        // disconnect
+        this.server.closeConnections();
+        while (!selector.disconnected().contains(node))
+            selector.poll(1000L);
+
+        // reconnect and do another request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+    }
+
+
+    /**
+     * Validate that the client can intentionally disconnect and reconnect
+     */
+    @Test
+    public void testClientDisconnect() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        selector.disconnect(node);
+        selector.send(createSend(node, "hello1"));
+        selector.poll(10L);
+        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+        blockingConnect(node);
+        assertEquals("hello2", blockingRequest(node, "hello2"));
+    }
+
+     /**
+     * Tests wrap BUFFER_OVERFLOW  and unwrap BUFFER_UNDERFLOW
+     * @throws Exception
+     */
+    @Test
+    public void testLargeMessageSequence() throws Exception {
+        int bufferSize = 512 * 1024;
+        String node = "0";
+        int reqs = 50;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        String requestPrefix = TestUtils.randomString(bufferSize);
+        sendAndReceive(node, requestPrefix, 0, reqs);
+    }
+
+
+    /**
+     * Test sending an empty string
+     */
+    @Test
+    public void testEmptyRequest() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        assertEquals("", blockingRequest(node, ""));
+    }
+
+
+    @Test
+    public void testMute() throws Exception {
+        blockingConnect("0");
+        blockingConnect("1");
+        // wait for handshake to finish
+        while (!selector.isChannelReady("0") && !selector.isChannelReady("1"))
+            selector.poll(5);
+        selector.send(createSend("0", "hello"));
+        selector.send(createSend("1", "hi"));
+        selector.mute("1");
+
+        while (selector.completedReceives().isEmpty())
+            selector.poll(5);
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source());
+        selector.unmute("1");
+        do {
+            selector.poll(5);
+        } while (selector.completedReceives().isEmpty());
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
+    }
+
+
+    /**
+     * Tests that SSL renegotiation initiated by the server are handled correctly by the client
+     * @throws Exception
+     */
+    @Test
+    public void testRenegotiation() throws Exception {
+        int reqs = 500;
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        int requests = 0;
+        int responses = 0;
+        int renegotiates = 0;
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        requests++;
+
+        // loop until we complete all requests
+        while (responses < reqs) {
+            selector.poll(0L);
+            if (responses >= 100 && renegotiates == 0) {
+                renegotiates++;
+                server.renegotiate();
+            }
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            // handle any responses we may have gotten
+            for (NetworkReceive receive : selector.completedReceives()) {
+                String[] pieces = asString(receive).split("-");
+                assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+                assertEquals("Check the source", receive.source(), pieces[0]);
+                assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+                assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
+                responses++;
+            }
+
+            // prepare new sends for the next round
+            for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) {
+                selector.send(createSend(node, node + "-" + requests));
+            }
+        }
+    }
+
+    private String blockingRequest(String node, String s) throws IOException {
+        selector.send(createSend(node, s));
+        while (true) {
+            selector.poll(1000L);
+            for (NetworkReceive receive : selector.completedReceives())
+                if (receive.source() == node)
+                    return asString(receive);
+        }
+    }
+
+    private String asString(NetworkReceive receive) {
+        return new String(Utils.toArray(receive.payload()));
+    }
+
+    private NetworkSend createSend(String node, String s) {
+        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
+    }
+
+    /* connect and wait for the connection to complete */
+    private void blockingConnect(String node) throws IOException {
+        selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        while (!selector.connected().contains(node))
+            selector.poll(10000L);
+        //finish the handshake as well
+        while (!selector.isChannelReady(node))
+            selector.poll(10000L);
+    }
+
+
+    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
+        int requests = startIndex;
+        int responses = startIndex;
+        // wait for handshake to finish
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, requestPrefix + "-" + startIndex));
+        requests++;
+        while (responses < endIndex) {
+            // do the i/o
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            // handle requests and responses of the fast node
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(requestPrefix + "-" + responses, asString(receive));
+                responses++;
+            }
+
+            for (int i = 0; i < selector.completedSends().size() && requests < endIndex && selector.isChannelReady(node); i++, requests++) {
+                selector.send(createSend(node, requestPrefix + "-" + requests));
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 158f982..3a684d9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -15,16 +15,16 @@ package org.apache.kafka.common.network;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -43,13 +43,18 @@ public class SelectorTest {
     private EchoServer server;
     private Time time;
     private Selectable selector;
+    private ChannelBuilder channelBuilder;
 
     @Before
     public void setup() throws Exception {
-        this.server = new EchoServer();
+        Map<String, Object> configs = new HashMap<String, Object>();
+        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        this.server = new EchoServer(configs);
         this.server.start();
         this.time = new MockTime();
-        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>());
+        this.channelBuilder = new PlaintextChannelBuilder();
+        this.channelBuilder.configure(configs);
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
     }
 
     @After
@@ -208,6 +213,19 @@ public class SelectorTest {
         assertEquals(big, blockingRequest(node, big));
     }
 
+    @Test
+    public void testLargeMessageSequence() throws Exception {
+        int bufferSize = 512 * 1024;
+        String node = "0";
+        int reqs = 50;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        String requestPrefix = TestUtils.randomString(bufferSize);
+        sendAndReceive(node, requestPrefix, 0, reqs);
+    }
+
+
+
     /**
      * Test sending an empty string
      */
@@ -285,71 +303,27 @@ public class SelectorTest {
         return new String(Utils.toArray(receive.payload()));
     }
 
-    /**
-     * A simple server that takes size delimited byte arrays and just echos them back to the sender.
-     */
-    static class EchoServer extends Thread {
-        public final int port;
-        private final ServerSocket serverSocket;
-        private final List<Thread> threads;
-        private final List<Socket> sockets;
-
-        public EchoServer() throws Exception {
-            this.serverSocket = new ServerSocket(0);
-            this.port = this.serverSocket.getLocalPort();
-            this.threads = Collections.synchronizedList(new ArrayList<Thread>());
-            this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
-        }
+    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
+        int requests = startIndex;
+        int responses = startIndex;
+        selector.send(createSend(node, requestPrefix + "-" + startIndex));
+        requests++;
+        while (responses < endIndex) {
+            // do the i/o
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
 
-        public void run() {
-            try {
-                while (true) {
-                    final Socket socket = serverSocket.accept();
-                    sockets.add(socket);
-                    Thread thread = new Thread() {
-                        public void run() {
-                            try {
-                                DataInputStream input = new DataInputStream(socket.getInputStream());
-                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-                                while (socket.isConnected() && !socket.isClosed()) {
-                                    int size = input.readInt();
-                                    byte[] bytes = new byte[size];
-                                    input.readFully(bytes);
-                                    output.writeInt(size);
-                                    output.write(bytes);
-                                    output.flush();
-                                }
-                            } catch (IOException e) {
-                                // ignore
-                            } finally {
-                                try {
-                                    socket.close();
-                                } catch (IOException e) {
-                                    // ignore
-                                }
-                            }
-                        }
-                    };
-                    thread.start();
-                    threads.add(thread);
-                }
-            } catch (IOException e) {
-                // ignore
+            // handle requests and responses of the fast node
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(requestPrefix + "-" + responses, asString(receive));
+                responses++;
             }
-        }
-
-        public void closeConnections() throws IOException {
-            for (Socket socket : sockets)
-                socket.close();
-        }
 
-        public void close() throws IOException, InterruptedException {
-            this.serverSocket.close();
-            closeConnections();
-            for (Thread t : threads)
-                t.join();
-            join();
+            for (int i = 0; i < selector.completedSends().size() && requests < endIndex; i++, requests++) {
+                selector.send(createSend(node, requestPrefix + "-" + requests));
+            }
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
new file mode 100644
index 0000000..0aec666
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import javax.net.ssl.*;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.kafka.test.TestSSLUtils;
+
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SSLFactoryTest {
+
+    @Test
+    public void testSSLFactoryConfiguration() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+        sslFactory.configure(serverSSLConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
+        assertNotNull(engine);
+        String[] expectedProtocols = {"TLSv1.2"};
+        assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
+        assertEquals(false, engine.getUseClientMode());
+    }
+
+    @Test
+    public void testClientMode() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.CLIENT, trustStoreFile, "client");
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT);
+        sslFactory.configure(clientSSLConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
+        assertTrue(engine.getUseClientMode());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index e7951d8..74ec52b 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -52,7 +52,7 @@ public class UtilsTest {
         assertEquals("[::1]:1234", formatAddress("::1", 1234));
         assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
     }
-    
+
     @Test
     public void testJoin() {
         assertEquals("", Utils.join(Collections.emptyList(), ","));
@@ -108,4 +108,4 @@ public class UtilsTest {
         assertEquals(1, Utils.min(2, 1, 3));
         assertEquals(1, Utils.min(2, 3, 1));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 51eb9d1..7257cad 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -120,4 +120,8 @@ public class MockSelector implements Selectable {
     public void unmuteAll() {
     }
 
+    @Override
+    public boolean isChannelReady(String id) {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
new file mode 100644
index 0000000..c01cf37
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.math.BigInteger;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.*;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v1CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class TestSSLUtils {
+
+    /**
+     * Create a self-signed X.509 Certificate.
+     * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
+     *
+     * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
+     * @param pair the KeyPair
+     * @param days how many days from now the Certificate is valid for
+     * @param algorithm the signing algorithm, eg "SHA1withRSA"
+     * @return the self-signed certificate
+     * @throws CertificateException thrown if a security error or an IO error ocurred.
+     */
+    public static X509Certificate generateCertificate(String dn, KeyPair pair,
+                                                      int days, String algorithm)
+        throws  CertificateException {
+
+        try {
+            Security.addProvider(new BouncyCastleProvider());
+            AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
+            AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
+            AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
+            SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
+            ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam);
+            X500Name name = new X500Name(dn);
+            Date from = new Date();
+            Date to = new Date(from.getTime() + days * 86400000L);
+            BigInteger sn = new BigInteger(64, new SecureRandom());
+
+            X509v1CertificateBuilder v1CertGen = new X509v1CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
+            X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
+            return new JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
+        } catch (CertificateException ce) {
+            throw ce;
+        } catch (Exception e) {
+            throw new CertificateException(e);
+        }
+    }
+
+    public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
+        KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
+        keyGen.initialize(1024);
+        return keyGen.genKeyPair();
+    }
+
+    private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null, null); // initialize
+        return ks;
+    }
+
+    private static void saveKeyStore(KeyStore ks, String filename,
+                                     String password) throws GeneralSecurityException, IOException {
+        FileOutputStream out = new FileOutputStream(filename);
+        try {
+            ks.store(out, password.toCharArray());
+        } finally {
+            out.close();
+        }
+    }
+
+    public static void createKeyStore(String filename,
+                                      String password, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, password.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    /**
+     * Creates a keystore with a single key and saves it to a file.
+     *
+     * @param filename String file to save
+     * @param password String store password to set on keystore
+     * @param keyPassword String key password to set on key
+     * @param alias String alias to use for the key
+     * @param privateKey Key to save in keystore
+     * @param cert Certificate to use as certificate chain associated to key
+     * @throws GeneralSecurityException for any error with the security APIs
+     * @throws IOException if there is an I/O error saving the file
+     */
+    public static void createKeyStore(String filename,
+                                      String password, String keyPassword, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static void createTrustStore(String filename,
+                                        String password, String alias,
+                                        Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setCertificateEntry(alias, cert);
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static <T extends Certificate> void createTrustStore(
+            String filename, String password, Map<String, T> certs) throws GeneralSecurityException, IOException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        try {
+            FileInputStream in = new FileInputStream(filename);
+            ks.load(in, password.toCharArray());
+            in.close();
+        } catch (EOFException e) {
+            ks = createEmptyKeyStore();
+        }
+        for (Map.Entry<String, T> cert : certs.entrySet()) {
+            ks.setCertificateEntry(cert.getKey(), cert.getValue());
+        }
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair)
+        throws GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA");
+        certs.put("localhost", cert);
+        return certs;
+    }
+
+    public static Map<String, Object> createSSLConfig(SSLFactory.Mode mode, File keyStoreFile, String password, String keyPassword,
+                                                      File trustStoreFile, String trustStorePassword) {
+        Map<String, Object> sslConfigs = new HashMap<String, Object>();
+        sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
+        sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
+
+        if (mode == SSLFactory.Mode.SERVER || (mode == SSLFactory.Mode.CLIENT && keyStoreFile != null)) {
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+            sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
+            sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
+        }
+
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+
+        List<String> enabledProtocols  = new ArrayList<String>();
+        enabledProtocols.add("TLSv1.2");
+        sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
+
+        return sslConfigs;
+    }
+
+    public static  Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, SSLFactory.Mode mode, File trustStoreFile, String certAlias)
+        throws IOException, GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        File keyStoreFile;
+        String password;
+
+        if (mode == SSLFactory.Mode.SERVER)
+            password = "ServerPassword";
+        else
+            password = "ClientPassword";
+
+        String trustStorePassword = "TrustStorePassword";
+
+        if (useClientCert) {
+            keyStoreFile = File.createTempFile("clientKS", ".jks");
+            KeyPair cKP = generateKeyPair("RSA");
+            X509Certificate cCert = generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
+            certs.put(certAlias, cCert);
+        } else {
+            keyStoreFile = File.createTempFile("serverKS", ".jks");
+            KeyPair sKP = generateKeyPair("RSA");
+            X509Certificate sCert = generateCertificate("CN=localhost, O=server", sKP, 30,
+                                                        "SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
+            certs.put(certAlias, sCert);
+        }
+
+        if (trustStore) {
+            createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
+        }
+
+        Map<String, Object> sslConfig = createSSLConfig(mode, keyStoreFile, password,
+                                                        password, trustStoreFile, trustStorePassword);
+        return sslConfig;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 0b6b33a..b9efec2 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -24,6 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.network.TransportLayer
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.network.MultiSend
 
@@ -56,7 +57,7 @@ class PartitionDataSend(val partitionId: Int,
                         val partitionData: FetchResponsePartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0
-
+  private var pending = false
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
@@ -64,7 +65,7 @@ class PartitionDataSend(val partitionId: Int,
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()
 
-  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize
+  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize && !pending
 
   override def destination: String = ""
 
@@ -77,6 +78,8 @@ class PartitionDataSend(val partitionId: Int,
       messagesSentSize += bytesSent
       written += bytesSent
     }
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
     written
   }
 
@@ -111,7 +114,9 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
 
   private var sent = 0L
 
-  override def completed: Boolean = sent >= size
+  private var pending = false
+
+  override def completed: Boolean = sent >= size && !pending
 
   override def destination: String = dest
 
@@ -135,6 +140,10 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
     if(!buffer.hasRemaining && !sends.completed) {
       written += sends.writeTo(channel)
     }
+
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+
     sent += written
     written
   }
@@ -214,9 +223,11 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
 
   private var sent = 0L
 
+  private var pending = false
+
   override def size = 4 /* for size byte */ + payloadSize
 
-  override def completed = sent >= size
+  override def completed = sent >= size && !pending
 
   override def destination = dest
 
@@ -242,7 +253,10 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
       written += sends.writeTo(channel)
     }
     sent += written
+
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+
     written
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dbe784b..649812d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,10 +28,13 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.EndPoint
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.InvalidReceiveException
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.network.{InvalidReceiveException, ChannelBuilder,
+                                        PlaintextChannelBuilder, SSLChannelBuilder}
+import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
@@ -42,28 +45,33 @@ import scala.util.control.{NonFatal, ControlThrowable}
 /**
  * An NIO socket server. The threading model is
  *   1 Acceptor thread that handles new connections
- *   N Processor threads that each have their own selector and read requests from sockets
+ *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
-class SocketServer(val brokerId: Int,
-                   val endpoints: Map[SecurityProtocol, EndPoint],
-                   val numProcessorThreads: Int,
-                   val maxQueuedRequests: Int,
-                   val sendBufferSize: Int,
-                   val recvBufferSize: Int,
-                   val maxRequestSize: Int = Int.MaxValue,
-                   val maxConnectionsPerIp: Int = Int.MaxValue,
-                   val connectionsMaxIdleMs: Long,
-                   val maxConnectionsPerIpOverrides: Map[String, Int],
-                   val time: Time,
-                   val metrics: Metrics) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Socket Server on Broker " + brokerId + "], "
-
-  private val processors = new Array[Processor](numProcessorThreads)
+class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
+
+  val channelConfigs = config.channelConfigs
+
+  val endpoints = config.listeners
+  val numProcessorThreads = config.numNetworkThreads
+  val maxQueuedRequests = config.queuedMaxRequests
+  val sendBufferSize = config.socketSendBufferBytes
+  val recvBufferSize = config.socketReceiveBufferBytes
+  val maxRequestSize = config.socketRequestMaxBytes
+  val maxConnectionsPerIp = config.maxConnectionsPerIp
+  val connectionsMaxIdleMs = config.connectionsMaxIdleMs
+  val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
+  val totalProcessorThreads = numProcessorThreads * endpoints.size
+
+  this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
+
+  val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
+  private val processors = new Array[Processor](totalProcessorThreads)
+
   private[network] var acceptors =  mutable.Map[EndPoint,Acceptor]()
-  val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
-  private val allMetricNames = (0 until numProcessorThreads).map { i =>
+
+  private val allMetricNames = (0 until totalProcessorThreads).map { i =>
     val tags = new util.HashMap[String, String]()
     tags.put("networkProcessor", i.toString)
     new MetricName("io-wait-ratio", "socket-server-metrics", tags)
@@ -83,49 +91,31 @@ class SocketServer(val brokerId: Int,
   def startup() {
     val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
 
-    newGauge("NetworkProcessorAvgIdlePercent",
-      new Gauge[Double] {
-        def value = allMetricNames.map( metricName =>
-          metrics.metrics().get(metricName).value()).sum / numProcessorThreads
-      }
-    )
-
-
-    this.synchronized {
-      for (i <- 0 until numProcessorThreads) {
-        processors(i) = new Processor(i,
-          time,
-          maxRequestSize,
-          numProcessorThreads,
-          requestChannel,
-          quotas,
-          connectionsMaxIdleMs,
-          portToProtocol,
-          metrics
-          )
-        Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
-      }
-    }
-
-    // register the processor threads for notification of responses
-    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
-   
-    // start accepting connections
-    // right now we will use the same processors for all ports, since we didn't implement different protocols
-    // in the future, we may implement different processors for SSL and Kerberos
-
     this.synchronized {
+      var processorBeginIndex = 0
       endpoints.values.foreach(endpoint => {
-        val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas, endpoint.protocolType, portToProtocol)
+        val acceptor = new Acceptor(endpoint.host, endpoint.port, sendBufferSize, recvBufferSize, config.brokerId, requestChannel, processors, processorBeginIndex, numProcessorThreads, quotas,
+          endpoint.protocolType, portToProtocol, channelConfigs,  maxQueuedRequests, maxRequestSize, connectionsMaxIdleMs, metrics, allMetricNames, time)
         acceptors.put(endpoint, acceptor)
         Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start()
         acceptor.awaitStartup
+        processorBeginIndex += numProcessorThreads
       })
     }
 
+    newGauge("NetworkProcessorAvgIdlePercent",
+      new Gauge[Double] {
+        def value = allMetricNames.map( metricName =>
+          metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
+      }
+    )
+
     info("Started " + acceptors.size + " acceptor threads")
   }
 
+  // register the processor threads for notification of responses
+  requestChannel.addResponseListener(id => processors(id).wakeup())
+
   /**
    * Shutdown the socket server
    */
@@ -145,8 +135,8 @@ class SocketServer(val brokerId: Int,
       case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
     }
   }
-}
 
+}
 /**
  * A base class with some helper variables and methods
  */
@@ -188,7 +178,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
    * Is the server still running?
    */
   protected def isRunning = alive.get
-  
+
   /**
    * Close the given key and associated socket
    */
@@ -199,8 +189,9 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
       swallowError(key.cancel())
     }
   }
-  
+
   def close(channel: SocketChannel) {
+
     if(channel != null) {
       debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
       connectionQuotas.dec(channel.socket.getInetAddress)
@@ -213,25 +204,55 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val host: String, 
+private[kafka] class Acceptor(val host: String,
                               private val port: Int,
-                              private val processors: Array[Processor],
-                              val sendBufferSize: Int, 
+                              val sendBufferSize: Int,
                               val recvBufferSize: Int,
+                              brokerId: Int,
+                              requestChannel: RequestChannel,
+                              processors: Array[Processor],
+                              processorBeginIndex: Int,
+                              numProcessorThreads: Int,
                               connectionQuotas: ConnectionQuotas,
                               protocol: SecurityProtocol,
-                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
+                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol],
+                              channelConfigs: java.util.Map[String, Object],
+                              maxQueuedRequests: Int,
+                              maxRequestSize: Int,
+                              connectionsMaxIdleMs: Long,
+                              metrics: Metrics,
+                              allMetricNames: Seq[MetricName],
+                              time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
   val nioSelector = java.nio.channels.Selector.open()
   val serverChannel = openServerSocket(host, port)
+  val processorEndIndex = processorBeginIndex + numProcessorThreads
+
   portToProtocol.put(serverChannel.socket().getLocalPort, protocol)
 
+  this.synchronized {
+    for (i <- processorBeginIndex until processorEndIndex) {
+        processors(i) = new Processor(i,
+          time,
+          maxRequestSize,
+          numProcessorThreads,
+          requestChannel,
+          connectionQuotas,
+          connectionsMaxIdleMs,
+          protocol,
+          channelConfigs,
+          metrics
+          )
+        Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, protocol.name, i), processors(i), false).start()
+    }
+  }
+
   /**
    * Accept loop that checks for new connection attempts
    */
   def run() {
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
     startupComplete()
-    var currentProcessor = 0
+    var currentProcessor = processorBeginIndex
     while(isRunning) {
       val ready = nioSelector.select(500)
       if(ready > 0) {
@@ -248,7 +269,8 @@ private[kafka] class Acceptor(val host: String,
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
 
             // round robin to the next processor thread
-            currentProcessor = (currentProcessor + 1) % processors.length
+            currentProcessor = (currentProcessor + 1) % processorEndIndex
+            if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex
           } catch {
             case e: Throwable => error("Error while accepting connection", e)
           }
@@ -260,12 +282,12 @@ private[kafka] class Acceptor(val host: String,
     swallowError(nioSelector.close())
     shutdownComplete()
   }
-  
+
   /*
    * Create a server socket to listen for connections on.
    */
   def openServerSocket(host: String, port: Int): ServerSocketChannel = {
-    val socketAddress = 
+    val socketAddress =
       if(host == null || host.trim.isEmpty)
         new InetSocketAddress(port)
       else
@@ -277,7 +299,7 @@ private[kafka] class Acceptor(val host: String,
       serverChannel.socket.bind(socketAddress)
       info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
     } catch {
-      case e: SocketException => 
+      case e: SocketException =>
         throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
     }
     serverChannel
@@ -328,15 +350,17 @@ private[kafka] class Processor(val id: Int,
                                val requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
                                val connectionsMaxIdleMs: Long,
-                               val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol],
+                               val protocol: SecurityProtocol,
+                               val channelConfigs: java.util.Map[String, Object],
                                val metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-
+  private val channelBuilder = createChannelBuilder
   private val metricTags = new util.HashMap[String, String]()
   metricTags.put("networkProcessor", id.toString)
 
+
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
@@ -353,7 +377,8 @@ private[kafka] class Processor(val id: Int,
     time,
     "socket-server",
     metricTags,
-    false)
+    false,
+    channelBuilder)
 
   override def run() {
     startupComplete()
@@ -379,7 +404,7 @@ private[kafka] class Processor(val id: Int,
         }
         collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
           try {
-            val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
+            val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
             requestChannel.sendRequest(req)
           } catch {
             case e @ (_: InvalidRequestException | _: SchemaException) => {
@@ -474,6 +499,14 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
+  private def createChannelBuilder(): ChannelBuilder = {
+    val channelBuilder: ChannelBuilder = if (protocol == SecurityProtocol.SSL)  new SSLChannelBuilder(SSLFactory.Mode.SERVER)
+                                        else new PlaintextChannelBuilder()
+
+    channelBuilder.configure(channelConfigs)
+    channelBuilder
+  }
+
   /**
    * Close all open connections
    */
@@ -492,7 +525,7 @@ private[kafka] class Processor(val id: Int,
 class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
   private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2))
   private val counts = mutable.Map[InetAddress, Int]()
-  
+
   def inc(addr: InetAddress) {
     counts synchronized {
       val count = counts.getOrElse(addr, 0)
@@ -502,7 +535,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
         throw new TooManyConnectionsException(addr, max)
     }
   }
-  
+
   def dec(addr: InetAddress) {
     counts synchronized {
       val count = counts.get(addr).get
@@ -512,7 +545,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
         counts.put(addr, count - 1)
     }
   }
-  
+
 }
 
 class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count))


Mime
View raw message