kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [4/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao
Date Wed, 19 Aug 2015 04:53:38 GMT
kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e2c683f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e2c683f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e2c683f

Branch: refs/heads/trunk
Commit: 9e2c683f550b7ae58008d0bcb62238b7a2d89a65
Parents: 503bd36
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Tue Aug 18 21:51:15 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Aug 18 21:51:15 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   6 +-
 checkstyle/import-control.xml                   |  52 +-
 .../org/apache/kafka/clients/ClientUtils.java   |  37 +-
 .../kafka/clients/CommonClientConfigs.java      |  18 +-
 .../org/apache/kafka/clients/NetworkClient.java |  41 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  26 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  92 +--
 .../kafka/clients/producer/KafkaProducer.java   |   7 +-
 .../kafka/clients/producer/ProducerConfig.java  |  18 +
 .../kafka/common/config/AbstractConfig.java     |  10 +-
 .../apache/kafka/common/config/SSLConfigs.java  | 102 +++
 .../kafka/common/network/Authenticator.java     |  62 ++
 .../kafka/common/network/ByteBufferSend.java    |  13 +-
 .../kafka/common/network/ChannelBuilder.java    |  44 ++
 .../common/network/DefaultAuthenticator.java    |  63 ++
 .../kafka/common/network/KafkaChannel.java      | 166 +++++
 .../kafka/common/network/NetworkReceive.java    |   5 +-
 .../common/network/PlaintextChannelBuilder.java |  58 ++
 .../common/network/PlaintextTransportLayer.java | 217 ++++++
 .../kafka/common/network/SSLChannelBuilder.java |  68 ++
 .../kafka/common/network/SSLTransportLayer.java | 690 +++++++++++++++++++
 .../apache/kafka/common/network/Selectable.java |  12 +-
 .../apache/kafka/common/network/Selector.java   | 316 +++++----
 .../org/apache/kafka/common/network/Send.java   |   6 +-
 .../kafka/common/network/TransportLayer.java    |  86 +++
 .../kafka/common/protocol/SecurityProtocol.java |   2 +
 .../security/auth/DefaultPrincipalBuilder.java  |  43 ++
 .../common/security/auth/KafkaPrincipal.java    |  58 ++
 .../common/security/auth/PrincipalBuilder.java  |  51 ++
 .../kafka/common/security/ssl/SSLFactory.java   | 210 ++++++
 .../org/apache/kafka/common/utils/Utils.java    |  53 +-
 .../apache/kafka/clients/ClientUtilsTest.java   |   2 +-
 .../clients/producer/KafkaProducerTest.java     |  19 +-
 .../apache/kafka/common/network/EchoServer.java | 119 ++++
 .../kafka/common/network/SSLSelectorTest.java   | 276 ++++++++
 .../kafka/common/network/SelectorTest.java      | 110 ++-
 .../common/security/ssl/SSLFactoryTest.java     |  60 ++
 .../apache/kafka/common/utils/UtilsTest.java    |   4 +-
 .../org/apache/kafka/test/MockSelector.java     |   8 +-
 .../org/apache/kafka/test/TestSSLUtils.java     | 243 +++++++
 .../main/scala/kafka/api/FetchResponse.scala    |  24 +-
 .../main/scala/kafka/network/SocketServer.scala | 175 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 218 ++++--
 .../main/scala/kafka/server/KafkaServer.scala   | 110 ++-
 .../kafka/api/ProducerSendTest.scala            |   4 +-
 .../integration/kafka/api/SSLConsumerTest.scala | 251 +++++++
 .../kafka/api/SSLProducerSendTest.scala         | 240 +++++++
 .../unit/kafka/admin/AddPartitionsTest.scala    |   2 +-
 .../integration/UncleanLeaderElectionTest.scala |   6 +-
 .../unit/kafka/network/SocketServerTest.scala   |  86 ++-
 .../unit/kafka/server/KafkaConfigTest.scala     |  42 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  89 ++-
 52 files changed, 4121 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 983587f..17fc223 100644
--- a/build.gradle
+++ b/build.gradle
@@ -256,9 +256,10 @@ project(':core') {
     testCompile "$junit"
     testCompile "$easymock"
     testCompile 'org.objenesis:objenesis:1.2'
+    testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
     testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
-    testCompile project(path: ':clients', configuration: 'archives')
-
+    testCompile project(':clients')
+    testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
 
     zinc 'com.typesafe.zinc:zinc:0.3.7'
@@ -390,6 +391,7 @@ project(':clients') {
     compile 'org.xerial.snappy:snappy-java:1.1.1.7'
     compile 'net.jpountz.lz4:lz4:1.2.0'
 
+    testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
     testCompile "$junit"
     testRuntime "$slf4jlog4j"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e3f4f84..eb682f4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -1,6 +1,6 @@
 <!DOCTYPE import-control PUBLIC
-    "-//Puppy Crawl//DTD Import Control 1.1//EN"
-    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+"-//Puppy Crawl//DTD Import Control 1.1//EN"
+"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
 <!--
 // Licensed to the Apache Software Foundation (ASF) under one or more
 // contributor license agreements.  See the NOTICE file distributed with
@@ -8,68 +8,79 @@
 // The ASF licenses this file to You under the Apache License, Version 2.0
 // (the "License"); you may not use this file except in compliance with
 // the License.  You may obtain a copy of the License at
-// 
+//
 //    http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
---> 
+-->
+
 <import-control pkg="org.apache.kafka">
-	
+
 	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-	
+
 	<!-- common library dependencies -->
 	<allow pkg="java" />
 	<allow pkg="javax.management" />
 	<allow pkg="org.slf4j" />
 	<allow pkg="org.junit" />
-	
+	<allow pkg="javax.net.ssl" />
+
 	<!-- no one depends on the server -->
 	<disallow pkg="kafka" />
-	
+
 	<!-- anyone can use public classes -->
 	<allow pkg="org.apache.kafka.common" exact-match="true" />
 	<allow pkg="org.apache.kafka.common.utils" />
-	
+
 	<subpackage name="common">
 		<disallow pkg="org.apache.kafka.clients" />
 		<allow pkg="org.apache.kafka.common" exact-match="true" />
 		<allow pkg="org.apache.kafka.test" />
-	
+
 		<subpackage name="config">
 			<allow pkg="org.apache.kafka.common.config" />
 			<!-- for testing -->
 			<allow pkg="org.apache.kafka.common.metrics" />
 		</subpackage>
-	
+
 		<subpackage name="metrics">
 			<allow pkg="org.apache.kafka.common.metrics" />
 		</subpackage>
-	
+
 		<subpackage name="network">
+			<allow pkg="org.apache.kafka.common.security.auth" />
+			<allow pkg="org.apache.kafka.common.protocol" />
+			<allow pkg="org.apache.kafka.common.config" />
 			<allow pkg="org.apache.kafka.common.metrics" />
+      <allow pkg="org.apache.kafka.common.security" />
 		</subpackage>
-	
+
+		<subpackage name="security">
+      <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.config" />
+    </subpackage>
+
 		<subpackage name="protocol">
 			<allow pkg="org.apache.kafka.common.errors" />
 			<allow pkg="org.apache.kafka.common.protocol.types" />
 		</subpackage>
-	
+
 		<subpackage name="record">
 			<allow pkg="net.jpountz" />
 			<allow pkg="org.apache.kafka.common.record" />
 		</subpackage>
-	
+
 		<subpackage name="requests">
 			<allow pkg="org.apache.kafka.common.protocol" />
 			<allow pkg="org.apache.kafka.common.network" />
 			<!-- for testing -->
 			<allow pkg="org.apache.kafka.common.errors" />
 		</subpackage>
-	
+
 		<subpackage name="serialization">
 			<allow class="org.apache.kafka.common.errors.SerializationException" />
 		</subpackage>
@@ -80,15 +91,15 @@
 		<allow pkg="org.slf4j" />
 		<allow pkg="org.apache.kafka.clients" exact-match="true"/>
 		<allow pkg="org.apache.kafka.test" />
-	
+
 		<subpackage name="consumer">
 			<allow pkg="org.apache.kafka.clients.consumer" />
 		</subpackage>
-	
+
 		<subpackage name="producer">
 			<allow pkg="org.apache.kafka.clients.producer" />
 		</subpackage>
-	
+
 		<subpackage name="tools">
 			<allow pkg="org.apache.kafka.clients.producer" />
 			<allow pkg="org.apache.kafka.clients.consumer" />
@@ -106,6 +117,7 @@
 
 	<subpackage name="test">
 		<allow pkg="org.apache.kafka" />
+		<allow pkg="org.bouncycastle" />
 	</subpackage>
 
 	<subpackage name="copycat">

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 0d68bf1..ba3bcbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -16,8 +16,14 @@ import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.SSLChannelBuilder;
+import org.apache.kafka.common.network.PlaintextChannelBuilder;
+import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.config.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
+
 public class ClientUtils {
     private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
 
@@ -61,4 +68,28 @@ public class ClientUtils {
             }
         }
     }
-}
\ No newline at end of file
+
+    /**
+     * @param configs client/server configs
+     * returns ChannelBuilder configured channelBuilder based on the configs.
+     */
+    public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
+        SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+        ChannelBuilder channelBuilder = null;
+
+        switch (securityProtocol) {
+            case SSL:
+                channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+                break;
+            case PLAINTEXT:
+                channelBuilder = new PlaintextChannelBuilder();
+                break;
+            default:
+                throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+        }
+
+        channelBuilder.configure(configs);
+        return channelBuilder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 2c421f4..7d24c6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -17,7 +17,7 @@ package org.apache.kafka.clients;
  * Some configurations shared by both producer and consumer
  */
 public class CommonClientConfigs {
-    
+
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
      */
@@ -27,10 +27,10 @@ public class CommonClientConfigs {
                                                        + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
                                                        + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
                                                        + "servers (you may want more than one, though, in case a server is down).";
-    
+
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
-    
+
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
     public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
 
@@ -45,7 +45,7 @@ public class CommonClientConfigs {
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";
-    
+
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
     public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";
 
@@ -55,6 +55,10 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
+    public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
+    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.";
+    public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
+
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
     public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0e51d7b..b31f7f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -101,7 +101,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Begin connecting to the given node, return true if we are already connected and ready to send to that node.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      * @return True if we are ready to send to the given node
@@ -122,7 +122,7 @@ public class NetworkClient implements KafkaClient {
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
      * connections.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      * @return The number of milliseconds to wait.
@@ -147,7 +147,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Check if the node with the given id is ready to send more requests.
-     * 
+     *
      * @param node The node
      * @param now The current time in ms
      * @return true if the node is ready
@@ -161,21 +161,21 @@ public class NetworkClient implements KafkaClient {
             return false;
         else
             // otherwise we are ready if we are connected and can send more requests
-            return isSendable(nodeId);
+            return canSendRequest(nodeId);
     }
 
     /**
      * Are we connected and ready and able to send more requests to the given connection?
-     * 
+     *
      * @param node The node
      */
-    private boolean isSendable(String node) {
-        return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
+    private boolean canSendRequest(String node) {
+        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
     }
 
     /**
      * Return the state of the connection to the given node
-     * 
+     *
      * @param node The node to check
      * @return The connection state
      */
@@ -185,13 +185,13 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
-     * 
+     *
      * @param request The request
      */
     @Override
     public void send(ClientRequest request) {
         String nodeId = request.request().destination();
-        if (!isSendable(nodeId))
+        if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
 
         this.inFlightRequests.add(request);
@@ -200,7 +200,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Do actual reads and writes to sockets.
-     * 
+     *
      * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately
      * @param now The current time in milliseconds
      * @return The list of responses received
@@ -246,7 +246,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Await all the outstanding responses for requests on the given connection
-     * 
+     *
      * @param node The node to block on
      * @param now The current time in ms
      * @return All the collected responses
@@ -294,7 +294,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Generate a request header for the given API key
-     * 
+     *
      * @param key The api key
      * @return A request header with the appropriate client id and correlation id
      */
@@ -324,7 +324,7 @@ public class NetworkClient implements KafkaClient {
      * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
      * connection if all existing connections are in use. This method will never choose a node for which there is no
      * existing connection and from which we have disconnected within the reconnect backoff period.
-     * 
+     *
      * @return The node with the fewest in-flight requests.
      */
     public Node leastLoadedNode(long now) {
@@ -349,7 +349,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any completed request send. In particular if no response is expected consider the request complete.
-     * 
+     *
      * @param responses The list of responses to update
      * @param now The current time
      */
@@ -366,7 +366,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any completed receives and update the response list with the responses received.
-     * 
+     *
      * @param responses The list of responses to update
      * @param now The current time
      */
@@ -407,7 +407,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any disconnected connections
-     * 
+     *
      * @param responses The list of responses that completed with the disconnection
      * @param now The current time
      */
@@ -472,8 +472,7 @@ public class NetworkClient implements KafkaClient {
         }
         String nodeConnectionId = node.idString();
 
-
-        if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) {
+        if (canSendRequest(nodeConnectionId)) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
             ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index d35b421..9c9510a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.config.SSLConfigs;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -153,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String CHECK_CRCS_CONFIG = "check.crcs";
     private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
-    
+
     /** <code>key.deserializer</code> */
     public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
     private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
@@ -267,7 +268,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.BOOLEAN,
                                         true,
                                         Importance.LOW,
-                                        CHECK_CRCS_DOC)                                
+                                        CHECK_CRCS_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
                                         30000,
@@ -293,12 +294,29 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+
     }
 
     public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index be46b6c..3749880 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -31,8 +31,9 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -65,7 +66,6 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
  * Failure to close the consumer after use will leak these connections.
- * <p>
  * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
  *
  * <h3>Offsets and Consumer Position</h3>
@@ -85,9 +85,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
  * detail below.
- * 
+ *
  * <h3>Consumer Groups</h3>
- * 
+ *
  * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
  * processing records. These processes can either be running on the same machine or, as is more likely, they can be
  * distributed over many machines to provide additional scalability and fault tolerance for processing.
@@ -116,14 +116,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
  * partition balancing.
- * 
+ *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
  * demonstrate how to use them.
- * 
+ *
  * <h4>Simple Processing</h4>
  * This example demonstrates the simplest usage of Kafka's consumer api.
- * 
+ *
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -141,7 +141,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  *             System.out.printf(&quot;offset = %d, key = %s, value = %s&quot;, record.offset(), record.key(), record.value());
  *     }
  * </pre>
- * 
+ *
  * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
  * the config <code>auto.commit.interval.ms</code>.
  * <p>
@@ -161,9 +161,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
- * 
+ *
  * <h4>Controlling When Messages Are Considered Consumed</h4>
- * 
+ *
  * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
  * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
  * would be considered consumed after they were given out by the consumer, and it would be possible that our process
@@ -175,7 +175,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
  * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
  * time but in failure cases could be duplicated.
- * 
+ *
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -201,9 +201,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  *         }
  *     }
  * </pre>
- * 
+ *
  * <h4>Subscribing To Specific Partitions</h4>
- * 
+ *
  * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
  * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
  * instances of our program can divided up the work of processing records.
@@ -223,7 +223,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
  * partitions:
- * 
+ *
  * <pre>
  *     String topic = &quot;foo&quot;;
  *     TopicPartition partition0 = new TopicPartition(topic, 0);
@@ -231,15 +231,15 @@ import static org.apache.kafka.common.utils.Utils.min;
  *     consumer.subscribe(partition0);
  *     consumer.subscribe(partition1);
  * </pre>
- * 
+ *
  * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
  * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
  * <p>
  * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
  * balancing) using the same consumer instance.
- * 
+ *
  * <h4>Managing Your Own Offsets</h4>
- * 
+ *
  * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
  * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
  * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
@@ -259,14 +259,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
  * from what it has ensuring that no updates are lost.
  * </ul>
- * 
+ *
  * Each record comes with it's own offset, so to manage your own offset you just need to do the following:
  * <ol>
  * <li>Configure <code>enable.auto.commit=false</code>
  * <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
  * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
  * </ol>
- * 
+ *
  * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
  * search index use case described above). If the partition assignment is done automatically special care will also be
  * needed to handle the case where partition assignments change. This can be handled using a special callback specified
@@ -279,9 +279,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
  * partitions that are moved elsewhere.
- * 
+ *
  * <h4>Controlling The Consumer's Position</h4>
- * 
+ *
  * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
  * position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
  * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
@@ -296,14 +296,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
  * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
  * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
- * 
+ *
  * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
  * methods for seeking to the earliest and latest offset the server maintains are also available (
  * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
- * 
+ *
  *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
- * 
+ *
  * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
  * making the call. It is the responsibility of the user to ensure that multi-threaded access
  * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
@@ -353,9 +353,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * We have intentionally avoided implementing a particular threading model for processing. This leaves several
  * options for implementing multi-threaded processing of records.
  *
- * 
+ *
  * <h4>1. One Consumer Per Thread</h4>
- * 
+ *
  * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
  * <ul>
  * <li><b>PRO</b>: It is the easiest to implement
@@ -368,13 +368,13 @@ import static org.apache.kafka.common.utils.Utils.min;
  * which can cause some drop in I/O throughput.
  * <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
  * </ul>
- * 
+ *
  * <h4>2. Decouple Consumption and Processing</h4>
- * 
+ *
  * Another alternative is to have one or more consumer threads that do all data consumption and hands off
  * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
  * the record processing.
- * 
+ *
  * This option likewise has pros and cons:
  * <ul>
  * <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
@@ -385,11 +385,11 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
  * that processing is complete for that partition.
  * </ul>
- * 
+ *
  * There are many possible variations on this approach. For example each processor thread can have it's own queue, and
  * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
  * commit.
- * 
+ *
  */
 @InterfaceStability.Unstable
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
@@ -430,7 +430,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * string "42" or the integer 42).
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param configs The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
@@ -442,7 +442,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param configs The consumer configs
      * @param callback A callback interface that the user can implement to manage customized offsets on the start and
      *            end of every rebalance operation.
@@ -476,7 +476,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param properties The consumer configuration properties
      * @param callback A callback interface that the user can implement to manage customized offsets on the start and
      *            end of every rebalance operation.
@@ -524,12 +524,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
-
             String metricGrpPrefix = "consumer";
             Map<String, String> metricsTags = new LinkedHashMap<String, String>();
             metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -623,7 +623,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <li>An existing member of the consumer group dies
      * <li>A new member is added to an existing consumer group via the join API
      * </ul>
-     * 
+     *
      * @param topics A variable list of topics that the consumer wants to subscribe to
      */
     @Override
@@ -664,7 +664,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
      * be returned from the next {@link #poll(long) poll()} onwards
-     * 
+     *
      * @param topics Topics to unsubscribe from
      */
     public void unsubscribe(String... topics) {
@@ -682,7 +682,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
      * {@link #poll(long) poll()} onwards
-     * 
+     *
      * @param partitions Partitions to unsubscribe from
      */
     public void unsubscribe(TopicPartition... partitions) {
@@ -705,11 +705,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
      * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
      * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
-     * 
+     *
      * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
      *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
-     * 
+     *
      * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
      *             offset reset policy has been configured.
      */
@@ -932,7 +932,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
-     * 
+     *
      * @param partition The partition to get the position for
      * @return The offset
      * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
@@ -961,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
      * consumer hasn't yet initialized it's cache of committed offsets.
-     * 
+     *
      * @param partition The partition to check
      * @return The last committed offset
      * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
@@ -1003,7 +1003,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
      * does not already have any metadata about the given topic.
-     * 
+     *
      * @param topic The topic to get partition metadata for
      * @return The list of partitions
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 03b8dd2..c4621e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
@@ -226,9 +227,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     metricTags);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
-
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient client = new NetworkClient(
-                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags),
+                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                     this.metadata,
                     clientId,
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@@ -305,7 +306,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * <p>
      * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
      * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
-     * get()} on this future will block until the associated request completes and then return the metadata for the record 
+     * get()} on this future will block until the associated request completes and then return the metadata for the record
      * or throw any exception that occurred while sending the record.
      * <p>
      * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index aa26420..06f00a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,6 +22,7 @@ import java.util.Properties;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -225,9 +226,26 @@ public class ProducerConfig extends AbstractConfig {
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
+
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 6c31748..156ec14 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -107,6 +107,10 @@ public class AbstractConfig {
         return copy;
     }
 
+    public Map<String, ?> values() {
+        return new HashMap<String, Object>(values);
+    }
+
     private void logAll() {
         StringBuilder b = new StringBuilder();
         b.append(getClass().getSimpleName());
@@ -133,7 +137,7 @@ public class AbstractConfig {
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
-     * 
+     *
      * @param key The configuration key for the class
      * @param t The interface the class should implement
      * @return A configured instance of the class

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
new file mode 100644
index 0000000..dd7b71a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.KeyManagerFactory;
+
+public class SSLConfigs {
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
+    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
+    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
+
+    public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
+    public static final String SSL_PROTOCOL_DOC = "The ssl protocol used to generate SSLContext."
+            + "Default setting is TLS. Allowed values are SSL, SSLv2, SSLv3, TLS, TLSv1.1, TLSv1.2";
+    public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+
+    public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
+    public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
+
+    public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
+    public static final String SSL_CIPHER_SUITES_DOC = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
+            + "By default all the available cipher suites are supported.";
+
+    public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
+    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
+            + "All versions of TLS is enabled by default.";
+    public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+
+    public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
+    public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
+            + "This is optional for client. Default value is JKS";
+    public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
+
+    public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
+    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+        + "This is optional for Client and can be used for two-way authentication for client.";
+
+    public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
+    public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
+        + "This is optional for client and only needed if the ssl.keystore.location configured. ";
+
+    public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
+    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
+            + "This is optional for client.";
+
+    public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
+    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
+            + "Default value is JKS.";
+    public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
+
+    public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+    public static final String DEFAULT_TRUSTSTORE_LOCATION = "/tmp/ssl.truststore.jks";
+
+    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
+    public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
+    public static final String DEFAULT_TRUSTSTORE_PASSWORD = "truststore_password";
+
+    public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
+    public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
+            + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
+            + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
+
+    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+                                           + " The following settings are common: "
+                                           + " <ul>"
+                                           + " <li><code>ssl.want.client.auth=required</code> If set to required"
+                                           + " client authentication is required."
+                                           + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
+                                           + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
+                                           + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
+
+    public static final String SSL_NEED_CLIENT_AUTH_DOC = "It can be REQUESTED . "
+        + "Default value is false";
+    public static final Boolean DEFAULT_SSL_NEED_CLIENT_AUTH = false;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
new file mode 100644
index 0000000..261f571
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/**
+ * Authentication for Channel
+ */
+
+import java.io.IOException;
+import java.security.Principal;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.KafkaException;
+
+public interface Authenticator {
+
+    /**
+     * configures Authenticator using principalbuilder and transportLayer.
+     * @param TransportLayer transportLayer
+     * @param PrincipalBuilder principalBuilder
+     */
+    void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder);
+
+    /**
+     * Implements any authentication mechanism. Use transportLayer to read or write tokens.
+     * If no further authentication needs to be done returns.
+     */
+    void authenticate() throws IOException;
+
+    /**
+     * Returns Principal using PrincipalBuilder
+     */
+    Principal principal() throws KafkaException;
+
+    /**
+     * returns true if authentication is complete otherwise returns false;
+     */
+    boolean complete();
+
+    /**
+     * Closes this Authenticator
+     *
+     * @throws IOException if any I/O error occurs
+     */
+    void close() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index df0e6d5..d7357b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -26,6 +26,7 @@ public class ByteBufferSend implements Send {
     protected final ByteBuffer[] buffers;
     private int remaining;
     private int size;
+    private boolean pending = false;
 
     public ByteBufferSend(String destination, ByteBuffer... buffers) {
         super();
@@ -43,7 +44,7 @@ public class ByteBufferSend implements Send {
 
     @Override
     public boolean completed() {
-        return remaining <= 0;
+        return remaining <= 0 && !pending;
     }
 
     @Override
@@ -57,6 +58,12 @@ public class ByteBufferSend implements Send {
         if (written < 0)
             throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
         remaining -= written;
+        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
+        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
+        // GatheringByteChannel or ScatteringByteChannel.
+        if (channel instanceof TransportLayer)
+            pending = ((TransportLayer) channel).hasPendingWrites();
+
         return written;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
new file mode 100644
index 0000000..52a7aab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.util.Map;
+import java.nio.channels.SelectionKey;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * A ChannelBuilder interface to build Channel based on configs
+ */
+public interface ChannelBuilder {
+
+    /**
+     * Configure this class with the given key-value pairs
+     */
+    void configure(Map<String, ?> configs) throws KafkaException;
+
+
+    /**
+     * returns a Channel with TransportLayer and Authenticator configured.
+     * @param  id  channel id
+     * @param  key SelectionKey
+     */
+    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
+
+
+    /**
+     * Closes ChannelBuilder
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
new file mode 100644
index 0000000..813a4aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.security.Principal;
+import java.io.IOException;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.KafkaException;
+
+public class DefaultAuthenticator implements Authenticator {
+
+    private TransportLayer transportLayer;
+    private PrincipalBuilder principalBuilder;
+    private Principal principal;
+
+    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder) {
+        this.transportLayer = transportLayer;
+        this.principalBuilder = principalBuilder;
+    }
+
+    /**
+     * No-Op for default authenticator
+     */
+    public void authenticate() throws IOException {}
+
+    /**
+     * Constructs Principal using configured principalBuilder.
+     * @return Principal
+     * @throws KafkaException
+     */
+    public Principal principal() throws KafkaException {
+        if (principal == null)
+            principal = principalBuilder.buildPrincipal(transportLayer, this);
+        return principal;
+    }
+
+    public void close() throws IOException {}
+
+    /**
+     * DefaultAuthenticator doesn't implement any additional authentication mechanism.
+     * @returns true
+     */
+    public boolean complete() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
new file mode 100644
index 0000000..28a4f41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+
+import java.io.IOException;
+
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaChannel {
+    private static final Logger log = LoggerFactory.getLogger(KafkaChannel.class);
+    private final String id;
+    private TransportLayer transportLayer;
+    private Authenticator authenticator;
+    private NetworkReceive receive;
+    private Send send;
+    private int maxReceiveSize;
+
+    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
+        this.id = id;
+        this.transportLayer = transportLayer;
+        this.authenticator = authenticator;
+        this.maxReceiveSize = maxReceiveSize;
+    }
+
+    public void close() throws IOException {
+        transportLayer.close();
+        authenticator.close();
+    }
+
+    /**
+     * returns user principal for the session
+     * In case of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
+     * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
+     */
+    public Principal principal() throws IOException {
+        return authenticator.principal();
+    }
+
+    /**
+     * Does handshake of transportLayer and Authentication using configured authenticator
+     */
+    public void prepare() throws IOException {
+        if (transportLayer.ready() && authenticator.complete())
+            return;
+        if (!transportLayer.ready())
+            transportLayer.handshake();
+        if (transportLayer.ready() && !authenticator.complete())
+            authenticator.authenticate();
+    }
+
+    public void disconnect() {
+        transportLayer.disconnect();
+    }
+
+
+    public void finishConnect() throws IOException {
+        transportLayer.finishConnect();
+    }
+
+    public boolean isConnected() {
+        return transportLayer.isConnected();
+    }
+
+    public String id() {
+        return id;
+    }
+
+    public void mute() {
+        transportLayer.removeInterestOps(SelectionKey.OP_READ);
+    }
+
+    public void unmute() {
+        transportLayer.addInterestOps(SelectionKey.OP_READ);
+    }
+
+    public boolean isMute() {
+        return transportLayer.isMute();
+    }
+
+    public boolean ready() {
+        return transportLayer.ready() && authenticator.complete();
+    }
+
+    public boolean hasSend() {
+        return send != null;
+    }
+
+    public String socketDescription() {
+        Socket socket = transportLayer.socketChannel().socket();
+        if (socket == null)
+            return "[unconnected socket]";
+        else if (socket.getInetAddress() != null)
+            return socket.getInetAddress().toString();
+        else
+            return socket.getLocalAddress().toString();
+    }
+
+    public void setSend(Send send) {
+        if (this.send != null)
+            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+        this.send = send;
+        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+    }
+
+    public NetworkReceive read() throws IOException {
+        NetworkReceive result = null;
+
+        if (receive == null) {
+            receive = new NetworkReceive(maxReceiveSize, id);
+        }
+
+        long x = receive(receive);
+        if (receive.complete()) {
+            receive.payload().rewind();
+            result = receive;
+            receive = null;
+        }
+        return result;
+    }
+
+    public Send write() throws IOException {
+        Send result = null;
+        if (send != null && send(send)) {
+            result = send;
+            send = null;
+        }
+        return result;
+    }
+
+    private long receive(NetworkReceive receive) throws IOException {
+        long result = receive.readFrom(transportLayer);
+        return result;
+    }
+
+    private boolean send(Send send) throws IOException {
+        send.writeTo(transportLayer);
+        if (send.completed())
+            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+
+        return send.completed();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 3ca0098..2a1568e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -89,6 +89,7 @@ public class NetworkReceive implements Receive {
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                 if (maxSize != UNLIMITED && receiveSize > maxSize)
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
+
                 this.buffer = ByteBuffer.allocate(receiveSize);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
new file mode 100644
index 0000000..76dbf93
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PlaintextChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
+    private PrincipalBuilder principalBuilder;
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(configs);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        KafkaChannel channel = null;
+        try {
+            PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
+            Authenticator authenticator = new DefaultAuthenticator();
+            authenticator.configure(transportLayer, this.principalBuilder);
+            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.warn("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+        return channel;
+    }
+
+    public void close() {
+        this.principalBuilder.close();
+    }
+
+}


Mime
View raw message