kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/9] kafka-site git commit: Add 0.10.1 docs
Date Tue, 04 Oct 2016 21:26:30 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ed0bb0d9/0101/security.html
----------------------------------------------------------------------
diff --git a/0101/security.html b/0101/security.html
new file mode 100644
index 0000000..2e77c93
--- /dev/null
+++ b/0101/security.html
@@ -0,0 +1,746 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<h3><a id="security_overview" href="#security_overview">7.1 Security Overview</a></h3>
+In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. These features are considered to be of beta quality. The following security measures are currently supported:
+<ol>
+    <li>Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos).
+    SASL/PLAIN can also be used from release 0.10.0.0 onwards.</li>
+    <li>Authentication of connections from brokers to ZooKeeper</li>
+    <li>Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)</li>
+    <li>Authorization of read / write operations by clients</li>
+    <li>Authorization is pluggable and integration with external authorization services is supported</li>
+</ol>
+
+It's worth noting that security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients.
+
+The guides below explain how to configure and use the security features in both clients and brokers.
+
+<h3><a id="security_ssl" href="#security_ssl">7.2 Encryption and Authentication using SSL</a></h3>
+Apache Kafka allows clients to connect over SSL. By default SSL is disabled but can be turned on as needed.
+
+<ol>
+    <li><h4><a id="security_ssl_key" href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
+        The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java's keytool utility to accomplish this task.
+        We will generate the key into a temporary keystore initially so that we can export and sign it later with CA.
+        <pre>
+        keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey</pre>
+
+        You need to specify two parameters in the above command:
+        <ol>
+            <li>keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely.</li>
+            <li>validity: the valid time of the certificate in days.</li>
+        </ol>
+        <br>
+	Note: By default the property <code>ssl.endpoint.identification.algorithm</code> is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following property:
+
+	<pre>	ssl.endpoint.identification.algorithm=HTTPS </pre>
+
+	Once enabled, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields:
+	<ol>
+		<li>Common Name (CN)
+		<li>Subject Alternative Name (SAN)
+	</ol>
+	<br>
+	Both fields are valid, RFC-2818 recommends the use of SAN however. SAN is also more flexible, allowing for multiple DNS entries to be declared. Another advantage is that the CN can be set to a more meaningful value for authorization purposes. To add a SAN field  append the following argument <code> -ext SAN=DNS:{FQDN} </code> to the keytool command:
+	<pre>
+	keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -ext SAN=DNS:{FQDN}
+	</pre>
+	The following command can be run afterwards to verify the contents of the generated certificate:
+	<pre>
+	keytool -list -v -keystore server.keystore.jks
+	</pre>
+    </li>
+    <li><h4><a id="security_ssl_ca" href="#security_ssl_ca">Creating your own CA</a></h4>
+        After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.<p>
+        Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
+        <pre>
+        openssl req <b>-new</b> -x509 -keyout ca-key -out ca-cert -days 365</pre>
+
+        The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.<br>
+
+        The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA:
+        <pre>
+        keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</pre>
+
+        <b>Note:</b> If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the <a href="#config_broker">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients' keys were signed by.
+        <pre>
+        keytool -keystore server.truststore.jks -alias CARoot <b>-import</b> -file ca-cert</pre>
+
+        In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.</li>
+
+    <li><h4><a id="security_ssl_signing" href="#security_ssl_signing">Signing the certificate</a></h4>
+        The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:
+        <pre>
+        keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file</pre>
+
+        Then sign it with the CA:
+        <pre>
+        openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}</pre>
+
+        Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
+        <pre>
+        keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
+        keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed</pre>
+
+        The definitions of the parameters are the following:
+        <ol>
+            <li>keystore: the location of the keystore</li>
+            <li>ca-cert: the certificate of the CA</li>
+            <li>ca-key: the private key of the CA</li>
+            <li>ca-password: the passphrase of the CA</li>
+            <li>cert-file: the exported, unsigned certificate of the server</li>
+            <li>cert-signed: the signed certificate of the server</li>
+        </ol>
+
+        Here is an example of a bash script with all above steps. Note that one of the commands assumes a password of `test1234`, so either use that password or edit the command before running it.
+            <pre>
+        #!/bin/bash
+        #Step 1
+        keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
+        #Step 2
+        openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
+        keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
+        keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
+        #Step 3
+        keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
+        openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
+        keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
+        keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed</pre></li>
+    <li><h4><a id="security_configbroker" href="#security_configbroker">Configuring Kafka Brokers</a></h4>
+        Kafka Brokers support listening for connections on multiple ports.
+        We need to configure the following property in server.properties, which must have one or more comma-separated values:
+        <pre>listeners</pre>
+
+        If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
+        <pre>
+        listeners=PLAINTEXT://host.name:port,SSL://host.name:port</pre>
+
+        Following SSL configs are needed on the broker side
+        <pre>
+        ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
+        ssl.keystore.password=test1234
+        ssl.key.password=test1234
+        ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
+        ssl.truststore.password=test1234</pre>
+
+        Optional settings that are worth considering:
+        <ol>
+            <li>ssl.client.auth=none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect. The usage of "requested" is discouraged as it provides a false sense of security and misconfigured clients will still connect successfully.)</li>
+            <li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list)</li>
+            <li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note that SSL is deprecated in favor of TLS and using SSL in production is not recommended)</li>
+            <li>ssl.keystore.type=JKS</li>
+            <li>ssl.truststore.type=JKS</li>
+            <li>ssl.secure.random.implementation=SHA1PRNG</li>
+        </ol>
+        If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT)
+        <pre>
+        security.inter.broker.protocol=SSL</pre>
+
+        <p>
+        Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the <a href="http://www.oracle.com/technetwork/java/javase/downloads/index.html">JCE Unlimited Strength Jurisdiction Policy Files</a> must be obtained and installed in the JDK/JRE. See the
+        <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunProviders.html">JCA Providers Documentation</a> for more information.
+        </p>
+
+        <p>
+        The JRE/JDK will have a default pseudo-random number generator (PRNG) that is used for cryptography operations, so it is not required to configure the
+        implementation used with the <pre>ssl.secure.random.implementation</pre>. However, there are performance issues with some implementations (notably, the
+        default chosen on Linux systems, <pre>NativePRNG</pre>, utilizes a global lock). In cases where performance of SSL connections becomes an issue,
+        consider explicitly setting the implementation to be used. The <pre>SHA1PRNG</pre> implementation is non-blocking, and has shown very good performance
+        characteristics under heavy load (50 MB/sec of produced messages, plus replication traffic, per-broker).
+        </p>
+
+        Once you start the broker you should be able to see in the server.log
+        <pre>
+        with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</pre>
+
+        To check quickly if  the server keystore and truststore are setup properly you can run the following command
+        <pre>openssl s_client -debug -connect localhost:9093 -tls1</pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
+        In the output of this command you should see server's certificate:
+        <pre>
+        -----BEGIN CERTIFICATE-----
+        {variable sized random bytes}
+        -----END CERTIFICATE-----
+        subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
+        issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</pre>
+        If the certificate does not show up or if there are any other error messages then your keystore is not setup properly.</li>
+
+    <li><h4><a id="security_configclients" href="#security_configclients">Configuring Kafka Clients</a></h4>
+        SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
+        If client authentication is not required in the broker, then the following is a minimal configuration example:
+        <pre>
+        security.protocol=SSL
+        ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
+        ssl.truststore.password=test1234</pre>
+
+        If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
+        <pre>
+        ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
+        ssl.keystore.password=test1234
+        ssl.key.password=test1234</pre>
+        Other configuration settings that may also be needed depending on our requirements and the broker configuration:
+            <ol>
+                <li>ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</li>
+                <li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.</li>
+                <li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side</li>
+                <li>ssl.truststore.type=JKS</li>
+                <li>ssl.keystore.type=JKS</li>
+            </ol>
+<br>
+        Examples using console-producer and console-consumer:
+        <pre>
+        kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
+        kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</pre>
+    </li>
+</ol>
+<h3><a id="security_sasl" href="#security_sasl">7.3 Authentication using SASL</a></h3>
+
+<ol>
+  <li><h4><a id="security_sasl_brokerconfig"
+    href="#security_sasl_brokerconfig">SASL configuration for Kafka brokers</a></h4>
+    <ol>
+      <li>Select one or more supported mechanisms to enable in the broker. <tt>GSSAPI</tt>
+        and <tt>PLAIN</tt> are the mechanisms currently supported in Kafka.</li>
+      <li>Add a JAAS config file for the selected mechanisms as described in the examples
+        for setting up <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>
+        or <a href="#security_sasl_plain_brokerconfig">PLAIN</a>.</li>
+      <li>Pass the JAAS config file location as JVM parameter to each Kafka broker.
+        For example:
+        <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
+      <li>Configure a SASL port in server.properties, by adding at least one of
+        SASL_PLAINTEXT or SASL_SSL to the <i>listeners</i> parameter, which
+        contains one or more comma-separated values:
+        <pre>    listeners=SASL_PLAINTEXT://host.name:port</pre>
+        If SASL_SSL is used, then <a href="#security_ssl">SSL must also be
+        configured</a>. If you are only configuring a SASL port (or if you want
+        the Kafka brokers to authenticate each other using SASL) then make sure
+        you set the same SASL protocol for inter-broker communication:
+        <pre>    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)</pre></li>
+      <li>Enable one or more SASL mechanisms in server.properties:
+          <pre>    sasl.enabled.mechanisms=GSSAPI (,PLAIN)</pre></li>
+      <li>Configure the SASL mechanism for inter-broker communication in server.properties
+        if using SASL for inter-broker communication:
+        <pre>    sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)</pre></li>
+      <li>Follow the steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>
+        or <a href="#security_sasl_plain_brokerconfig">PLAIN</a> to configure SASL
+        for the enabled mechanisms. To enable multiple mechanisms in the broker, follow
+        the steps <a href="#security_sasl_multimechanism">here</a>.</li>
+      <u><a id="security_sasl_brokernotes" href="#security_sasl_brokernotes">Important notes:</a></u>
+      <ol>
+        <li><tt>KafkaServer</tt> is the section name in the JAAS file used by each
+          KafkaServer/Broker. This section provides SASL configuration options
+          for the broker including any SASL client connections made by the broker
+          for inter-broker communication.</li>
+        <li><tt>Client</tt> section is used to authenticate a SASL connection with
+          zookeeper. It also allows the brokers to set SASL ACL on zookeeper
+          nodes which locks these nodes down so that only the brokers can
+          modify it. It is necessary to have the same principal name across all
+          brokers. If you want to use a section name other than Client, set the
+          system property <tt>zookeeper.sasl.client</tt> to the appropriate
+          name (<i>e.g.</i>, <tt>-Dzookeeper.sasl.client=ZkClient</tt>).</li>
+        <li>ZooKeeper uses "zookeeper" as the service name by default. If you
+          want to change this, set the system property
+          <tt>zookeeper.sasl.client.username</tt> to the appropriate name
+          (<i>e.g.</i>, <tt>-Dzookeeper.sasl.client.username=zk</tt>).</li>
+      </ol>
+    </ol>
+  </li>
+  <li><h4><a id="security_sasl_clientconfig"
+    href="#security_sasl_clientconfig">SASL configuration for Kafka clients</a></h4>
+    SASL authentication is only supported for the new Java Kafka producer and
+    consumer, the older API is not supported. To configure SASL authentication
+    on the clients:
+    <ol>
+      <li>Select a SASL mechanism for authentication.</li>
+      <li>Add a JAAS config file for the selected mechanism as described in the examples
+        for setting up <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>
+        or <a href="#security_sasl_plain_clientconfig">PLAIN</a>. <tt>KafkaClient</tt> is the
+        section name in the JAAS file used by Kafka clients.</li>
+      <li>Pass the JAAS config file location as JVM parameter to each client JVM. For example:
+        <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+      <li>Configure the following properties in producer.properties or
+        consumer.properties:
+        <pre>    security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+    sasl.mechanism=GSSAPI (or PLAIN)</pre></li>
+      <li>Follow the steps in <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>
+        or <a href="#security_sasl_plain_clientconfig">PLAIN</a> to configure SASL
+        for the selected mechanism.</li>
+    </ol>
+  </li>
+  <li><h4><a id="security_sasl_kerberos" href="#security_sasl_kerberos">Authentication using SASL/Kerberos</a></h4>
+    <ol>
+      <li><h5><a id="security_sasl_kerberos_prereq" href="#security_sasl_kerberos_prereq">Prerequisites</a></h5>
+      <ol>
+          <li><b>Kerberos</b><br>
+          If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (<a href="https://help.ubuntu.com/community/Kerberos">Ubuntu</a>, <a href="https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Managing_Smart_Cards/installing-kerberos.html">Redhat</a>). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.</li>
+          <li><b>Create Kerberos Principals</b><br>
+          If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).</br>
+          If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
+            <pre>
+    sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
+    sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</pre></li>
+          <li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
+      </ol>
+      <li><h5><a id="security_sasl_kerberos_brokerconfig" href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
+      <ol>
+          <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
+          <pre>
+    KafkaServer {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/etc/security/keytabs/kafka_server.keytab"
+        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
+    };
+
+    // Zookeeper client authentication
+    Client {
+       com.sun.security.auth.module.Krb5LoginModule required
+       useKeyTab=true
+       storeKey=true
+       keyTab="/etc/security/keytabs/kafka_server.keytab"
+       principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
+    };</pre>
+
+          </li>
+          <tt>KafkaServer</tt> section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It
+          allows the broker to login using the keytab specified in this section. See <a href="#security_sasl_brokernotes">notes</a> for more details on Zookeeper SASL configuration.
+          <li>Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details): 
+            <pre>    -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre>
+          </li>
+          <li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
+          <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
+          <pre>    listeners=SASL_PLAINTEXT://host.name:port
+    security.inter.broker.protocol=SASL_PLAINTEXT
+    sasl.mechanism.inter.broker.protocol=GSSAPI
+    sasl.enabled.mechanisms=GSSAPI
+          </pre>
+          </li>We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so: 
+          <pre>    sasl.kerberos.service.name=kafka</pre>
+
+      </ol></li>
+      <li><h5><a id="security_sasl_kerberos_clientconfig" href="#security_kerberos_sasl_clientconfig">Configuring Kafka Clients</a></h5>
+          To configure SASL authentication on the clients:
+          <ol>
+              <li>
+                  Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user running the client), so obtain or create these principals as needed. Then create a JAAS file for each principal.
+                  The KafkaClient section describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client using a keytab (recommended for long-running processes):
+              <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/etc/security/keytabs/kafka_client.keytab"
+        principal="kafka-client-1@EXAMPLE.COM";
+    };</pre>
+
+              For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with "useTicketCache=true" as in:
+              <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useTicketCache=true;
+    };</pre>
+              </li>
+              <li>Pass the JAAS and optionally krb5 file locations as JVM parameters to each client JVM (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details): 
+              <pre>    -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+              <li>Make sure the keytabs configured in the kafka_client_jaas.conf are readable by the operating system user who is starting kafka client.</li>
+              <li>Configure the following properties in producer.properties or consumer.properties: 
+              <pre>    security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+    sasl.mechanism=GSSAPI
+    sasl.kerberos.service.name=kafka</pre></li>
+          </ol>
+      </li>
+    </ol>
+  </li>
+      
+  <li><h4><a id="security_sasl_plain" href="#security_sasl_plain">Authentication using SASL/PLAIN</a></h4>
+    <p>SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
+       Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described <a href="#security_sasl_plain_production">here</a>.</p>
+       The username is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
+    <ol>
+      <li><h5><a id="security_sasl_plain_brokerconfig" href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
+        <ol>
+          <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
+            <pre>
+    KafkaServer {
+        org.apache.kafka.common.security.plain.PlainLoginModule required
+        username="admin"
+        password="admin-secret"
+        user_admin="admin-secret"
+        user_alice="alice-secret";
+    };</pre>
+            This configuration defines two users (<i>admin</i> and <i>alice</i>). The properties <tt>username</tt> and <tt>password</tt>
+            in the <tt>KafkaServer</tt> section are used by the broker to initiate connections to other brokers. In this example,
+            <i>admin</i> is the user for inter-broker communication. The set of properties <tt>user_<i>userName</i></tt> defines
+            the passwords for all users that connect to the broker and the broker validates all client connections including
+            those from other brokers using these properties.</li>
+          <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
+              <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</pre></li>
+          <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>.</pre> For example:
+            <pre>    listeners=SASL_SSL://host.name:port
+    security.inter.broker.protocol=SASL_SSL
+    sasl.mechanism.inter.broker.protocol=PLAIN
+    sasl.enabled.mechanisms=PLAIN</pre></li>
+        </ol>
+      </li>
+
+      <li><h5><a id="security_sasl_plain_clientconfig" href="#security_sasl_plain_clientconfig">Configuring Kafka Clients</a></h5>
+        To configure SASL authentication on the clients:
+        <ol>
+          <li>The <tt>KafkaClient</tt> section describes how the clients like producer and consumer can connect to the Kafka Broker.
+          The following is an example configuration for a client for the PLAIN mechanism:
+            <pre>
+    KafkaClient {
+        org.apache.kafka.common.security.plain.PlainLoginModule required
+        username="alice"
+        password="alice-secret";
+    };</pre>
+            The properties <tt>username</tt> and <tt>password</tt> in the <tt>KafkaClient</tt> section are used by clients to configure
+            the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
+          </li>
+          <li>Pass the JAAS config file location as JVM parameter to each client JVM:
+            <pre>    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+          <li>Configure the following properties in producer.properties or consumer.properties:
+            <pre>    security.protocol=SASL_SSL
+    sasl.mechanism=PLAIN</pre></li>
+        </ol>
+      </li>
+      <li><h5><a id="security_sasl_plain_production" href="#security_sasl_plain_production">Use of SASL/PLAIN in production</a></h5>
+        <ul>
+          <li>SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.</li>
+          <li>The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown
+            <a href="#security_sasl_plain_brokerconfig">here</a>. To avoid storing passwords on disk, you can plugin your own implementation of
+            <code>javax.security.auth.spi.LoginModule</code> that provides usernames and passwords from an external source. The login module implementation should
+            provide username as the public credential and password as the private credential of the <code>Subject</code>. The default implementation
+            <code>org.apache.kafka.common.security.plain.PlainLoginModule</code> can be used as an example.</li>
+          <li>In production systems, external authentication servers may implement password authentication. Kafka brokers can be integrated with these servers by adding
+            your own implementation of <code>javax.security.sasl.SaslServer</code>. The default implementation included in Kafka in the package
+            <code>org.apache.kafka.common.security.plain</code> can be used as an example to get started.
+            <ul>
+              <li>New providers must be installed and registered in the JVM. Providers can be installed by adding provider classes to
+              the normal <tt>CLASSPATH</tt> or bundled as a jar file and added to <tt><i>JAVA_HOME</i>/lib/ext</tt>.</li>
+              <li>Providers can be registered statically by adding a provider to the security properties file
+              <tt><i>JAVA_HOME</i>/lib/security/java.security</tt>.
+              <pre>    security.provider.n=providerClassName</pre>
+              where <i>providerClassName</i> is the fully qualified name of the new provider and <i>n</i> is the preference order with
+              lower numbers indicating higher preference.</li>
+              <li>Alternatively, you can register providers dynamically at runtime by invoking <code>Security.addProvider</code> at the beginning of the client
+              application or in a static initializer in the login module. For example:
+              <pre>    Security.addProvider(new PlainSaslServerProvider());</pre></li>
+              <li>For more details, see <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/crypto/CryptoSpec.html">JCA Reference</a>.</li>
+            </ul>
+          </li>
+        </ul>
+      </li>
+    </ol>
+  </li>
+  <li><h4><a id="security_sasl_multimechanism" href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
+    <ol>
+      <li>Specify configuration for the login modules of all enabled mechanisms in the <tt>KafkaServer</tt> section of the JAAS config file. For example:
+        <pre>
+    KafkaServer {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/etc/security/keytabs/kafka_server.keytab"
+        principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
+
+        org.apache.kafka.common.security.plain.PlainLoginModule required
+        username="admin"
+        password="admin-secret"
+        user_admin="admin-secret"
+        user_alice="alice-secret";
+    };</pre></li>
+      <li>Enable the SASL mechanisms in server.properties: <pre>    sasl.enabled.mechanisms=GSSAPI,PLAIN</pre></li>
+      <li>Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
+        <pre>    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
+    sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)</pre></li>
+      <li>Follow the mechanism-specific steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>
+          and <a href="#security_sasl_plain_brokerconfig">PLAIN</a> to configure SASL for the enabled mechanisms.</li>
+    </ol>
+  </li>
+  <li><h4><a id="saslmechanism_rolling_upgrade" href="#saslmechanism_rolling_upgrade">Modifying SASL mechanism in a Running Cluster</a></h4>
+    <p>SASL mechanism can be modified in a running cluster using the following sequence:</p>
+    <ol>
+      <li>Enable new SASL mechanism by adding the mechanism to <tt>sasl.enabled.mechanisms</tt> in server.properties for each broker. Update JAAS config file to include both
+        mechanisms as described <a href="#security_sasl_multimechanism">here</a>. Incrementally bounce the cluster nodes.</li>
+      <li>Restart clients using the new mechanism.</li>
+      <li>To change the mechanism of inter-broker communication (if this is required), set <tt>sasl.mechanism.inter.broker.protocol</tt> in server.properties to the new mechanism and
+        incrementally bounce the cluster again.</li>
+      <li>To remove old mechanism (if this is required), remove the old mechanism from <tt>sasl.enabled.mechanisms</tt> in server.properties and remove the entries for the
+        old mechanism from JAAS config file. Incrementally bounce the cluster again.</li>
+    </ol>
+  </li>
+</ol>
+
+<h3><a id="security_authz" href="#security_authz">7.4 Authorization and ACLs</a></h3>
+Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H On Resource R". You can read more about the acl structure on KIP-11. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if a Resource R has no associated acls, no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in broker.properties.
+<pre>allow.everyone.if.no.acl.found=true</pre>
+One can also add super users in broker.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma).
+<pre>super.users=User:Bob;User:Alice</pre>
+By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in broker.properties like the following.
+<pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre>
+By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in broker.properties.
+The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. Each rules starts with RULE: and contains an expression in the format [n:string](regexp)s/pattern/replacement/g. See the kerberos documentation for more details. An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
+<pre>sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</pre>
+
+<h4><a id="security_authz_cli" href="#security_authz_cli">Command Line Interface</a></h4>
+Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called <b>kafka-acls.sh</b>. Following lists all the options that the script supports:
+<p></p>
+<table class="data-table">
+    <tr>
+        <th>Option</th>
+        <th>Description</th>
+        <th>Default</th>
+        <th>Option type</th>
+    </tr>
+    <tr>
+        <td>--add</td>
+        <td>Indicates to the script that user is trying to add an acl.</td>
+        <td></td>
+        <td>Action</td>
+    </tr>
+    <tr>
+        <td>--remove</td>
+        <td>Indicates to the script that user is trying to remove an acl.</td>
+        <td></td>
+        <td>Action</td>
+    </tr>
+    <tr>
+        <td>--list</td>
+        <td>Indicates to the script that user is trying to list acls.</td>
+        <td></td>
+        <td>Action</td>
+    </tr>
+    <tr>
+        <td>--authorizer</td>
+        <td>Fully qualified class name of the authorizer.</td>
+        <td>kafka.security.auth.SimpleAclAuthorizer</td>
+        <td>Configuration</td>
+    </tr>
+    <tr>
+        <td>--authorizer-properties</td>
+        <td>key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181</td>
+        <td></td>
+        <td>Configuration</td>
+    </tr>
+    <tr>
+        <td>--cluster</td>
+        <td>Specifies cluster as resource.</td>
+        <td></td>
+        <td>Resource</td>
+    </tr>
+    <tr>
+        <td>--topic [topic-name]</td>
+        <td>Specifies the topic as resource.</td>
+        <td></td>
+        <td>Resource</td>
+    </tr>
+    <tr>
+        <td>--group [group-name]</td>
+        <td>Specifies the consumer-group as resource.</td>
+        <td></td>
+        <td>Resource</td>
+    </tr>
+    <tr>
+        <td>--allow-principal</td>
+        <td>Principal is in PrincipalType:name format that will be added to ACL with Allow permission. <br>You can specify multiple --allow-principal in a single command.</td>
+        <td></td>
+        <td>Principal</td>
+    </tr>
+    <tr>
+        <td>--deny-principal</td>
+        <td>Principal is in PrincipalType:name format that will be added to ACL with Deny permission. <br>You can specify multiple --deny-principal in a single command.</td>
+        <td></td>
+        <td>Principal</td>
+    </tr>
+    <tr>
+        <td>--allow-host</td>
+        <td>IP address from which principals listed in --allow-principal will have access.</td>
+        <td> if --allow-principal is specified defaults to * which translates to "all hosts"</td>
+        <td>Host</td>
+    </tr>
+    <tr>
+        <td>--deny-host</td>
+        <td>IP address from which principals listed in --deny-principal will be denied access.</td>
+        <td>if --deny-principal is specified defaults to * which translates to "all hosts"</td>
+        <td>Host</td>
+    </tr>
+    <tr>
+        <td>--operation</td>
+        <td>Operation that will be allowed or denied.<br>
+            Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All</td>
+        <td>All</td>
+        <td>Operation</td>
+    </tr>
+    <tr>
+        <td>--producer</td>
+        <td> Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE,
+            DESCRIBE on topic and CREATE on cluster.</td>
+        <td></td>
+        <td>Convenience</td>
+    </tr>
+    <tr>
+        <td>--consumer</td>
+        <td> Convenience option to add/remove acls for consumer role. This will generate acls that allows READ,
+            DESCRIBE on topic and READ on consumer-group.</td>
+        <td></td>
+        <td>Convenience</td>
+    </tr>
+    <tr>
+        <td>--force</td>
+        <td> Convenience option to assume yes to all queries and do not prompt.</td>
+        <td></td>
+        <td>Convenience</td>
+    </tr>
+</tbody></table>
+
+<h4><a id="security_authz_examples" href="#security_authz_examples">Examples</a></h4>
+<ul>
+    <li><b>Adding Acls</b><br>
+Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options:
+        <pre>bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</pre>
+        By default all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
+        <pre>bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</pre>
+        Note that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported).
+        Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].</li>
+
+    <li><b>Removing Acls</b><br>
+            Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
+           <pre> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </pre></li>
+
+    <li><b>List Acls</b><br>
+            We can list acls for any resource by specifying the --list option with the resource. To list all acls for Test-topic we can execute the CLI with following options:
+            <pre>bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic</pre></li>
+
+    <li><b>Adding or removing a principal as producer or consumer</b><br>
+            The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of  Test-topic we can execute the following command:
+           <pre> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic</pre>
+            Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
+           <pre> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1 </pre>
+            Note that for consumer option we must also specify the consumer group.
+            In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
+    </ul>
+
+<h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3>
+    You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:
+    <p></p>
+    <ul>
+        <li>Incrementally bounce the cluster nodes to open additional secured port(s).</li>
+        <li>Restart clients using the secured rather than PLAINTEXT port (assuming you are securing the client-broker connection).</li>
+        <li>Incrementally bounce the cluster again to enable broker-to-broker security (if this is required)</li>
+        <li>A final incremental bounce to close the PLAINTEXT port.</li>
+    </ul>
+    <p></p>
+    The specific steps for configuring SSL and SASL are described in sections <a href="#security_ssl">7.2</a> and <a href="#security_sasl">7.3</a>.
+    Follow these steps to enable security for your desired protocol(s).
+    <p></p>
+    The security implementation lets you configure different protocols for both broker-client and broker-broker communication.
+    These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.
+    <p></p>
+
+    When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.
+    <p></p>
+    As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, a SSL port is opened on each node:
+          <pre>
+         listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</pre>
+
+    We then restart the clients, changing their config to point at the newly opened, secured port:
+
+          <pre>
+        bootstrap.servers = [broker1:9092,...]
+        security.protocol = SSL
+        ...etc</pre>
+
+    In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
+
+          <pre>
+        listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+        security.inter.broker.protocol=SSL</pre>
+
+    In the final bounce we secure the cluster by closing the PLAINTEXT port:
+
+          <pre>
+        listeners=SSL://broker1:9092
+        security.inter.broker.protocol=SSL</pre>
+
+    Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
+
+          <pre>
+        listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</pre>
+
+    We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
+
+          <pre>
+        bootstrap.servers = [broker1:9093,...]
+        security.protocol = SASL_SSL
+        ...etc</pre>
+
+    The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
+
+          <pre>
+        listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+        security.inter.broker.protocol=SSL</pre>
+
+    The final bounce secures the cluster by closing the PLAINTEXT port.
+
+          <pre>
+       listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
+       security.inter.broker.protocol=SSL</pre>
+
+    ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section <a href="#zk_authz_migration">7.6.2</a>.
+
+
+<h3><a id="zk_authz" href="#zk_authz">7.6 ZooKeeper Authentication</a></h3>
+<h4><a id="zk_authz_new" href="#zk_authz_new">7.6.1 New clusters</a></h4>
+To enable ZooKeeper authentication on brokers, there are two necessary steps:
+<ol>
+	<li> Create a JAAS login file and set the appropriate system property to point to it as described above</li>
+	<li> Set the configuration property <tt>zookeeper.set.acl</tt> in each broker to true</li>
+</ol>
+
+The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper if the new Java consumer and producer clients are used).
+
+<h4><a id="zk_authz_migration" href="#zk_authz_migration">7.6.2 Migrating clusters</a></h4>
+If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
+<ol>
+	<li>Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
+	<li>Perform a second rolling restart of brokers, this time setting the configuration parameter <tt>zookeeper.set.acl</tt> to true, which enables the use of secure ACLs when creating znodes</li>
+	<li>Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
+</ol>
+<p>It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:</p>
+<ol>
+	<li>Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting <tt>zookeeper.set.acl</tt> to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes</li>
+	<li>Execute the ZkSecurityMigrator tool. To execute the tool, run this script <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
+	<li>Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file</li>
+</ol>
+Here is an example of how to run the migration tool:
+<pre>
+./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
+</pre>
+<p>Run this to see the full list of parameters:</p>
+<pre>
+./bin/zookeeper-security-migration --help
+</pre>
+<h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
+It is also necessary to enable authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. Please refer to the ZooKeeper documentation for more detail:
+<ol>
+	<li><a href="http://zookeeper.apache.org/doc/r3.4.8/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
+	<li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
+</ol>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ed0bb0d9/0101/streams.html
----------------------------------------------------------------------
diff --git a/0101/streams.html b/0101/streams.html
new file mode 100644
index 0000000..9c21ec4
--- /dev/null
+++ b/0101/streams.html
@@ -0,0 +1,342 @@
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~-->
+
+<h3><a id="streams_overview" href="#streams_overview">9.1 Overview</a></h3>
+
+<p>
+Kafka Streams is a client library for processing and analyzing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external system. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state.
+Kafka Streams has a <b>low barrier to entry</b>: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model.
+</p>
+<p>
+Some highlights of Kafka Streams:
+</p>
+
+<ul>
+    <li>Designed as a <b>simple and lightweight client library</b>, which can be easily embedded in any Java application and integrated with any existing packaging, deployment and operational tools that users have for their streaming applications.</li>
+    <li>Has <b>no external dependencies on systems other than Apache Kafka itself</b> as the internal messaging layer; notably, it uses Kafka's partitioning model to horizontally scale processing while maintaining strong ordering guarantees.</li>
+    <li>Supports <b>fault-tolerant local state</b>, which enables very fast and efficient stateful operations like joins and windowed aggregations.</li>
+    <li>Employs <b>one-record-at-a-time processing</b> to achieve low processing latency, and supports <b>event-time based windowing operations</b>.</li>
+    <li>Offers necessary stream processing primitives, along with a <b>high-level Streams DSL</b> and a <b>low-level Processor API</b>.</li>
+
+</ul>
+
+<h3><a id="streams_developer" href="#streams_developer">9.2 Developer Guide</a></h3>
+
+<p>
+There is a <a href="#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
+This section focuses on how to write, configure, and execute a Kafka Streams application.
+</p>
+
+<h4><a id="streams_concepts" href="#streams_concepts">Core Concepts</a></h4>
+
+<p>
+We first summarize the key concepts of Kafka Streams.
+</p>
+
+<h5><a id="streams_topology" href="#streams_topology">Stream Processing Topology</a></h5>
+
+<ul>
+    <li>A <b>stream</b> is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b> is defined as a key-value pair.</li>
+    <li>A stream processing application written in Kafka Streams defines its computational logic through one or more <b>processor topologies</b>, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).</li>
+    <li>A <b>stream processor</b> is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently producing one or more output records to its downstream processors.</li>
+</ul>
+
+<p>
+Kafka Streams offers two ways to define the stream processing topology: the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides
+the most common data transformation operations such as <code>map</code> and <code>filter</code>; the lower-level <a href="#streams_processor"><b>Processor API</b></a> allows
+developers define and connect custom processors as well as to interact with <a href="#streams_state">state stores</a>.
+</p>
+
+<h5><a id="streams_time" href="#streams_time">Time</a></h5>
+
+<p>
+A critical aspect in stream processing is the notion of <b>time</b>, and how it is modeled and integrated.
+For example, some operations such as <b>windowing</b> are defined based on time boundaries.
+</p>
+<p>
+Common notions of time in streams are:
+</p>
+
+<ul>
+    <li><b>Event time</b> - The point in time when an event or data record occurred, i.e. was originally created "at the source".</li>
+    <li><b>Processing time</b> - The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing time may be milliseconds, hours, or days etc. later than the original event time.</li>
+</ul>
+
+<p>
+Kafka Streams assigns a <b>timestamp</b> to every data record
+via the <code>TimestampExtractor</code> interface.
+Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field
+to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing,
+thereby yielding processing-time semantics to stream processing applications.
+Developers can thus enforce different notions of time depending on their business needs. For example,
+per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and
+are leveraged by time-dependent operations such as joins.
+</p>
+
+<h5><a id="streams_state" href="#streams_state">States</a></h5>
+
+<p>
+Some stream processing applications don't require state, which means the processing of a message is independent from
+the processing of all other messages.
+However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you
+can join input streams, or group and aggregate data records. Many such stateful operators are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>.
+</p>
+<p>
+Kafka Streams provides so-called <b>state stores</b>, which can be used by stream processing applications to store and query data.
+This is an important capability when implementing stateful operations.
+Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing.
+These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure.
+Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
+</p>
+<br>
+<p>
+As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="#streams_topology">processor topology</a>.
+Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
+</p>
+
+<h4><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h4>
+
+<h5><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h5>
+
+<p>
+Developers can define their customized processing logic by implementing the <code>Processor</code> interface, which
+provides <code>process</code> and <code>punctuate</code> methods. The <code>process</code> method is performed on each
+of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
+In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
+<code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
+forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
+processing progress (<code>context().commit</code>), etc.
+</p>
+
+<pre>
+    public class MyProcessor extends Processor<String, String> {
+        private ProcessorContext context;
+        private KeyValueStore<String, Integer> kvStore;
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void init(ProcessorContext context) {
+            this.context = context;
+            this.context.schedule(1000);
+            this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
+        }
+
+        @Override
+        public void process(String dummy, String line) {
+            String[] words = line.toLowerCase().split(" ");
+
+            for (String word : words) {
+                Integer oldValue = this.kvStore.get(word);
+
+                if (oldValue == null) {
+                    this.kvStore.put(word, 1);
+                } else {
+                    this.kvStore.put(word, oldValue + 1);
+                }
+            }
+        }
+
+        @Override
+        public void punctuate(long timestamp) {
+            KeyValueIterator<String, Integer> iter = this.kvStore.all();
+
+            while (iter.hasNext()) {
+                KeyValue<String, Integer> entry = iter.next();
+                context.forward(entry.key, entry.value.toString());
+            }
+
+            iter.close();
+            context.commit();
+        }
+
+        @Override
+        public void close() {
+            this.kvStore.close();
+        }
+    };
+</pre>
+
+<p>
+In the above implementation, the following actions are performed:
+
+<ul>
+    <li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
+    <li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
+    <li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
+</ul>
+</p>
+
+<h5><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h5>
+
+<p>
+With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+by connecting these processors together:
+
+<pre>
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.addSource("SOURCE", "src-topic")
+
+        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
+        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
+        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+
+        .addSink("SINK1", "sink-topic1", "PROCESS1")
+        .addSink("SINK2", "sink-topic2", "PROCESS2")
+        .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
+
+There are several steps in the above code to build the topology, and here is a quick walk through:
+
+<ul>
+    <li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
+    <li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
+    <li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
+</ul>
+</p>
+
+<h5><a id="streams_processor_statestore" href="#streams_processor_statestore">Local State Store</a></h5>
+
+<p>
+Note that the Processor API is not limited to only accessing the current records as they arrive, but can also maintain local state stores
+that keep recently arrived records to use in stateful processing operations such as aggregation or windowed joins.
+To take advantage of this local states, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
+local state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+
+<pre>
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.addSource("SOURCE", "src-topic")
+
+        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
+        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
+        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
+
+        // connect the state store "COUNTS" with processor "PROCESS2"
+        .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+
+        .addSink("SINK1", "sink-topic1", "PROCESS1")
+        .addSink("SINK2", "sink-topic2", "PROCESS2")
+        .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
+
+</p>
+
+In the next section we present another way to build the processor topology: the Kafka Streams DSL.
+
+<h4><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h4>
+
+To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
+A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
+through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
+codes for details.
+
+<h5><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h5>
+
+<p>
+Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code>)
+can be created as a source stream from one or more Kafka topics (for <code>KTable</code> you can only create the source stream
+from a single topic).
+</p>
+
+<pre>
+    KStreamBuilder builder = new KStreamBuilder();
+
+    KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
+    KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
+</pre>
+
+<h5><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h5>
+
+<p>
+There is a list of transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
+Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
+can be translated into one or more connected processors into the underlying processor topology.
+All these transformation methods can be chained together to compose a complex processor topology.
+Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
+generics functions where users could specify the input and output data types.
+</p>
+
+<p>
+Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
+transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
+where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
+<code>KeyValueMapper</code> for <code>map</code>, etc:
+
+</p>
+
+<pre>
+    // written in Java 8+, using lambda expressions
+    KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
+</pre>
+
+<p>
+Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
+they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
+require accessing an associated state for processing and producing outputs.
+For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
+within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
+based on them.
+</p>
+
+<pre>
+    // written in Java 8+, using lambda expressions
+    KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
+        () -> 0L,  // initial value
+        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
+        TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
+        Serdes.Long() // serde for aggregated value
+    );
+
+    KStream<String, String> joined = source1.leftJoin(source2,
+        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
+    );
+</pre>
+
+<h5><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h5>
+
+<p>
+At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
+<code>KStream.to</code> and <code>KTable.to</code>.
+</p>
+
+<pre>
+    joined.to("topic4");
+</pre>
+
+If your application needs to continue reading and processing the records after they have been materialized
+to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
+Kafka Streams provides a convenience method called <code>through</code>:
+
+<pre>
+    // equivalent to
+    //
+    // joined.to("topic4");
+    // materialized = builder.stream("topic4");
+    KStream<String, String> materialized = joined.through("topic4");
+</pre>
+
+
+<br>
+<p>
+Besides defining the topology, developers will also need to configure their applications
+in <code>StreamsConfig</code> before running it. A complete list of
+Kafka Streams configs can be found <a href="#streamsconfigs"><b>here</b></a>.
+</p>


Mime
View raw message