kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka-site git commit: doc changes included as part of 0.10.0.0-rc5
Date Mon, 16 May 2016 21:38:39 GMT
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site 76217f0b9 -> 1d077ee01


doc changes included as part of 0.10.0.0-rc5


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/1d077ee0
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/1d077ee0
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/1d077ee0

Branch: refs/heads/asf-site
Commit: 1d077ee01a68061a26c25484513f025c2ce7aa9b
Parents: 76217f0
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Mon May 16 14:37:33 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon May 16 14:37:33 2016 -0700

----------------------------------------------------------------------
 0100/api.html                      |  19 ++++++
 0100/connect.html                  | 103 +++++++++++++++++++++++++++-----
 0100/documentation.html            |   1 +
 0100/generated/streams_config.html |   2 +-
 0100/implementation.html           |  62 +++++++++++--------
 0100/ops.html                      |  11 ++++
 0100/quickstart.html               |  18 +++---
 0100/upgrade.html                  |  19 ++++++
 0100/uses.html                     |   2 +-
 9 files changed, 185 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/api.html
----------------------------------------------------------------------
diff --git a/0100/api.html b/0100/api.html
index 8d5be9b..c457241 100644
--- a/0100/api.html
+++ b/0100/api.html
@@ -165,3 +165,22 @@ This new unified consumer API removes the distinction between the 0.8
high-level
 
 Examples showing how to use the consumer are given in the
 <a href="http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html"
title="Kafka 0.9.0 Javadoc">javadocs</a>.
+
+<h3><a id="streamsapi" href="#streamsapi">2.3 Streams API</a></h3>
+
+As of the 0.10.0 release we have added a new client library named <b>Kafka Streams</b>
to let users implement their stream processing
+applications with data stored in Kafka topics. Kafka Streams is considered alpha quality
and its public APIs are likely to change in
+future releases.
+You can use Kafka Streams by adding a dependency on the streams jar using
+the following example maven co-ordinates (you can change the version numbers with new releases):
+
+<pre>
+	&lt;dependency&gt;
+	    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+	    &lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
+	    &lt;version&gt;0.10.0.0&lt;/version&gt;
+	&lt;/dependency&gt;
+</pre>
+
+Examples showing how to use this library are given in the
+<a href="http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html"
title="Kafka 0.10.0 Javadoc">javadocs</a> (note those classes annotated with <b>@InterfaceStability.Unstable</b>,
indicating their public APIs may change without backward-compatibility in future releases).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/connect.html
----------------------------------------------------------------------
diff --git a/0100/connect.html b/0100/connect.html
index c3cf583..4ba406e 100644
--- a/0100/connect.html
+++ b/0100/connect.html
@@ -53,15 +53,17 @@ Distributed mode handles automatic balancing of work, allows you to scale
up (or
 &gt; bin/connect-distributed.sh config/connect-distributed.properties
 </pre>
 
-The difference is in the class which is started and the configuration parameters which change
how the Kafka Connect process decides where to store configurations, how to assign work, and
where to store offsets. In particular, the following configuration parameters are critical
to set before starting your cluster:
+The difference is in the class which is started and the configuration parameters which change
how the Kafka Connect process decides where to store configurations, how to assign work, and
where to store offsets and task statues. In the distributed mode, Kafka Connect stores the
offsets, configs and task statuses in Kafka topics. It is recommended to manually create the
topics for offset, configs and statuses in order to achieve the desired the number of partitions
and replication factors. If the topics are not yet created when starting Kafka Connect, the
topics will be auto created with default number of partitions and replication factor, which
may not be best suited for its usage.
 
+In particular, the following configuration parameters are critical to set before starting
your cluster:
 <ul>
     <li><code>group.id</code> (default <code>connect-cluster</code>)
- unique name for the cluster, used in forming the Connect cluster group; note that this <b>must
not conflict</b> with consumer group IDs</li>
-    <li><code>config.storage.topic</code> (default <code>connect-configs</code>)
- topic to use for storing connector and task configurations; note that this should be a single
partition, highly replicated topic</li>
-    <li><code>offset.storage.topic</code> (default <code>connect-offsets</code>)
- topic to use for ; this topic should have many partitions and be replicated</li>
+    <li><code>config.storage.topic</code> (default <code>connect-configs</code>)
- topic to use for storing connector and task configurations; note that this should be a single
partition, highly replicated topic. You may need to manually create the topic to ensure single
partition for the config topic as auto created topics may have multiple partitions.</li>
+    <li><code>offset.storage.topic</code> (default <code>connect-offsets</code>)
- topic to use for storing offsets; this topic should have many partitions and be replicated</li>
+    <li><code>status.storage.topic</code> (default <code>connect-status</code>)
- topic to use for storing statuses; this topic can have multiple partitions and should be
replicated</li>
 </ul>
 
-Note that in distributed mode the connector configurations are not passed on the command
line. Instead, use the REST API described below to create, modify, and destroy connectors.
+Note that in distributed mode the connector configurations are not passed on the command
line. Instead, use the REST API described below to create, modify, and destroy connectors.

 
 
 <h4><a id="connect_configuring" href="#connect_configuring">Configuring Connectors</a></h4>
@@ -98,6 +100,10 @@ Since Kafka Connect is intended to be run as a service, it also provides
a REST
     <li><code>GET /connectors/{name}/status</code> - get current status
of the connector, including if it is running, failed, paused, etc., which worker it is assigned
to, error information if it has failed, and the state of all its tasks</li>
     <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks
currently running for a connector</li>
     <li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get
current status of the task, including if it is running, failed, paused, etc., which worker
it is assigned to, and error information if it has failed</li>
+    <li><code>PUT /connectors/{name}/pause</code> - pause the connector
and its tasks, which stops message processing until the connector is resumed</li>
+    <li><code>PUT /connectors/{name}/resume</code> - resume a paused connector
(or do nothing if the connector is not paused)</li>
+    <li><code>POST /connectors/{name}/restart</code> - restart a connector
(typically because it has failed)</li>
+    <li><code>POST /connectors/{name}/tasks/{taskId}/restart</code> - restart
an individual task (typically because it has failed)</li>
     <li><code>DELETE /connectors/{name}</code> - delete a connector, halting
all tasks and deleting its configuration</li>
 </ul>
 
@@ -154,7 +160,7 @@ The easiest method to fill in is <code>getTaskClass()</code>,
which defines the
 
 <pre>
 @Override
-public Class<? extends Task> getTaskClass() {
+public Class&lt;? extends Task&gt; getTaskClass() {
     return FileStreamSourceTask.class;
 }
 </pre>
@@ -175,7 +181,7 @@ public void stop() {
 }
 </pre>
 
-Finally, the real core of the implementation is in <code>getTaskConfigs()</code>.
In this case we're only
+Finally, the real core of the implementation is in <code>getTaskConfigs()</code>.
In this case we are only
 handling a single file, so even though we may be permitted to generate more tasks as per
the
 <code>maxTasks</code> argument, we return a list with only one entry:
 
@@ -221,7 +227,7 @@ public class FileStreamSourceTask extends SourceTask&lt;Object, Object&gt;
{
 
     @Override
     public synchronized void stop() {
-        stream.close()
+        stream.close();
     }
 </pre>
 
@@ -237,8 +243,8 @@ public List&lt;SourceRecord&gt; poll() throws InterruptedException
{
         while (streamValid(stream) &amp;&amp; records.isEmpty()) {
             LineAndOffset line = readToNextLine(stream);
             if (line != null) {
-                Map<String, Object> sourcePartition = Collections.singletonMap("filename",
filename);
-                Map<String, Object> sourceOffset = Collections.singletonMap("position",
streamOffset);
+                Map&lt;String, Object&gt; sourcePartition = Collections.singletonMap("filename",
filename);
+                Map&lt;String, Object&gt; sourceOffset = Collections.singletonMap("position",
streamOffset);
                 records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA,
line));
             } else {
                 Thread.sleep(1);
@@ -263,11 +269,13 @@ The previous section described how to implement a simple <code>SourceTask</code>
 
 <pre>
 public abstract class SinkTask implements Task {
-public void initialize(SinkTaskContext context) { ... }
-
-public abstract void put(Collection&lt;SinkRecord&gt; records);
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
 
-public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
+    public abstract void put(Collection&lt;SinkRecord&gt; records);
+     
+    public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
 </pre>
 
 The <code>SinkTask</code> documentation contains full details, but this interface
is nearly as simple as the <code>SourceTask</code>. The <code>put()</code>
method should contain most of the implementation, accepting sets of <code>SinkRecords</code>,
performing any required translation, and storing them in the destination system. This method
does not need to ensure the data has been fully written to the destination system before returning.
In fact, in many cases internal buffering will be useful so an entire batch of records can
be sent at once, reducing the overhead of inserting events into the downstream data store.
The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>:
Kafka topic, partition, offset and the event key and value.
@@ -301,8 +309,8 @@ Kafka Connect is intended to define bulk data copying jobs, such as copying
an e
 Source connectors need to monitor the source system for changes, e.g. table additions/deletions
in a database. When they pick up changes, they should notify the framework via the <code>ConnectorContext</code>
object that reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
 
 <pre>
-if (inputsChanged())
-    this.context.requestTaskReconfiguration();
+    if (inputsChanged())
+        this.context.requestTaskReconfiguration();
 </pre>
 
 The framework will promptly request new configuration information and update the tasks, allowing
them to gracefully commit their progress before reconfiguring them. Note that in the <code>SourceConnector</code>
this monitoring is currently left up to the connector implementation. If an extra thread is
required to perform this monitoring, the connector must allocate it itself.
@@ -311,6 +319,26 @@ Ideally this code for monitoring changes would be isolated to the <code>Connecto
 
 <code>SinkConnectors</code> usually only have to handle the addition of streams,
which may translate to new entries in their outputs (e.g., a new database table). The framework
manages any changes to the Kafka input, such as when the set of input topics changes because
of a regex subscription. <code>SinkTasks</code> should expect new input streams,
which may require creating new resources in the downstream system, such as a new table in
a database. The trickiest situation to handle in these cases may be conflicts between multiple
<code>SinkTasks</code> seeing a new input stream for the first time and simultaneously
trying to create the new resource. <code>SinkConnectors</code>, on the other hand,
will generally require no special code for handling a dynamic set of streams.
 
+<h4><a id="connect_configs" href="#connect_configs">Connect Configuration Validation</a></h4>
+
+Kafka Connect allows you to validate connector configurations before submitting a connector
to be executed and can provide feedback about errors and recommended values. To take advantage
of this, connector developers need to provide an implementation of <code>config()</code>
to expose the configuration definition to the framework.
+
+The following code in <code>FileStreamSourceConnector</code> defines the configuration
and exposes it to the framework.
+
+<pre>
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+</pre>
+
+<code>ConfigDef</code> class is used for specifying the set of expected configurations.
For each configuration, you can specify the name, the type, the default value, the documentation,
the group information, the order in the group, the width of the configuration value and the
name suitable for display in the UI. Plus, you can provide special validation logic used for
single configuration validation by overriding the <code>Validator</code> class.
Moreover, as there may be dependencies between configurations, for example, the valid values
and visibility of a configuration may change according to the values of other configurations.
To handle this, <code>ConfigDef</code> allows you to specify the dependents of
a configuration and to provide an implementation of <code>Recommender</code> to
get valid values and set visibility of a configuration given the current configuration values.
+
+Also, the <code>validate()</code> method in <code>Connector</code>
provides a default validation implementation which returns a list of allowed configurations
together with configuration errors and recommended values for each configuration. However,
it does not use the recommended values for configuration validation. You may provide an override
of the default implementation for customized configuration validation, which may use the recommended
values.
+
 <h4><a id="connect_schemas" href="#connect_schemas">Working with Schemas</a></h4>
 
 The FileStream connectors are good examples because they are simple, but they also have trivially
structured data -- each line is just a string. Almost all practical connectors will need schemas
with more complex data formats.
@@ -338,3 +366,48 @@ However, many connectors will have dynamic schemas. One simple example
of this i
 
 Sink connectors are usually simpler because they are consuming data and therefore do not
need to create schemas. However, they should take just as much care to validate that the schemas
they receive have the expected format. When the schema does not match -- usually indicating
the upstream producer is generating invalid data that cannot be correctly translated to the
destination system -- sink connectors should throw an exception to indicate this error to
the system.
 
+<h4><a id="connect_administration" href="#connect_administration">Kafka Connect
Administration</a></h4>
+
+<p>
+Kafka Connect's <a href="#connect_rest">REST layer</a> provides a set of APIs
to enable administration of the cluster. This includes APIs to view the configuration of connectors
and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration
and restarting tasks).
+</p>
+
+<p>
+When a connector is first submitted to the cluster, the workers rebalance the full set of
connectors in the cluster and their tasks so that each worker has approximately the same amount
of work. This same rebalancing procedure is also used when connectors increase or decrease
the number of tasks they require, or when a connector's configuration is changed. You can
use the REST API to view the current status of a connector and its tasks, including the id
of the worker to which each was assigned. For example, querying the status of a file source
(using <code>GET /connectors/file-source/status</code>) might produce output like
the following:
+</p>
+
+<pre>
+{
+  "name": "file-source",
+  "connector": {
+    "state": "RUNNING",
+    "worker_id": "192.168.1.208:8083"
+  },
+  "tasks": [
+    {
+      "id": 0,
+      "state": "RUNNING",
+      "worker_id": "192.168.1.209:8083"
+    }
+  ]
+}
+</pre>
+
+<p>
+Connectors and their tasks publish status updates to a shared topic (configured with <code>status.storage.topic</code>)
which all workers in the cluster monitor. Because the workers consume this topic asynchronously,
there is typically a (short) delay before a state change is visible through the status API.
The following states are possible for a connector or one of its tasks:
+</p>
+
+<ul>
+  <li><b>UNASSIGNED:</b> The connector/task has not yet been assigned to
a worker.</li>
+  <li><b>RUNNING:</b> The connector/task is running.</li>
+  <li><b>PAUSED:</b> The connector/task has been administratively paused.</li>
+  <li><b>FAILED:</b> The connector/task has failed (usually by raising
an exception, which is reported in the status output).</li>
+</ul>
+
+<p>
+In most cases, connector and task states will match, though they may be different for short
periods of time when changes are occurring or if tasks have failed. For example, when a connector
is first started, there may be a noticeable delay before the connector and its tasks have
all transitioned to the RUNNING state. States will also diverge when tasks fail since Connect
does not automatically restart failed tasks. To restart a connector/task manually, you can
use the restart APIs listed above. Note that if you try to restart a task while a rebalance
is taking place, Connect will return a 409 (Conflict) status code. You can retry after the
rebalance completes, but it might not be necessary since rebalances effectively restart all
the connectors and tasks in the cluster.
+</p>
+
+<p>
+It's sometimes useful to temporarily stop the message processing of a connector. For example,
if the remote system is undergoing maintenance, it would be preferable for source connectors
to stop polling it for new data instead of filling logs with exception spam. For this use
case, Connect offers a pause/resume API. While a source connector is paused, Connect will
stop polling it for additional records. While a sink connector is paused, Connect will stop
pushing new messages to it. The pause state is persistent, so even if you restart the cluster,
the connector will not begin message processing again until the task has been resumed. Note
that there may be a delay before all of a connector's tasks have transitioned to the PAUSED
state since it may take time for them to finish whatever processing they were in the middle
of when being paused. Additionally, failed tasks will not transition to the PAUSED state until
they have been restarted.
+</p>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/documentation.html
----------------------------------------------------------------------
diff --git a/0100/documentation.html b/0100/documentation.html
index 70002ab..ddc3102 100644
--- a/0100/documentation.html
+++ b/0100/documentation.html
@@ -40,6 +40,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>,
<a href="/08/documen
                       <li><a href="#simpleconsumerapi">2.2.2 Old Simple Consumer
API</a>
                       <li><a href="#newconsumerapi">2.2.3 New Consumer API</a>
                   </ul>
+              <li><a href="#streamsapi">2.3 Streams API</a>
           </ul>
     </li>
     <li><a href="#configuration">3. Configuration</a>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/generated/streams_config.html
----------------------------------------------------------------------
diff --git a/0100/generated/streams_config.html b/0100/generated/streams_config.html
index bfa2d74..73c7712 100644
--- a/0100/generated/streams_config.html
+++ b/0100/generated/streams_config.html
@@ -10,7 +10,7 @@
 <tr>
 <td>application.id</td><td>An identifier for the stream processing application.
Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2)
the group-id for membership management, 3) the changelog topic prefix.</td><td>string</td><td></td><td></td><td>high</td></tr>
 <tr>
-<td>bootstrap.servers</td><td>A list of host/port pairs to use for establishing
the initial connection to the Kafka cluster. The client will make use of all servers irrespective
of which servers are specified here for bootstrapping&mdash;this list only impacts the
initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>.
Since these servers are just used for the initial connection to discover the full cluster
membership (which may change dynamically), this list need not contain the full set of servers
(you may want more than one, though, in case a server is down).</td><td>string</td><td></td><td></td><td>high</td></tr>
+<td>bootstrap.servers</td><td>A list of host/port pairs to use for establishing
the initial connection to the Kafka cluster. The client will make use of all servers irrespective
of which servers are specified here for bootstrapping&mdash;this list only impacts the
initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>.
Since these servers are just used for the initial connection to discover the full cluster
membership (which may change dynamically), this list need not contain the full set of servers
(you may want more than one, though, in case a server is down).</td><td>list</td><td></td><td></td><td>high</td></tr>
 <tr>
 <td>client.id</td><td>An id string to pass to the server when making requests.
The purpose of this is to be able to track the source of requests beyond just ip/port by allowing
a logical application name to be included in server-side request logging.</td><td>string</td><td>""</td><td></td><td>high</td></tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/implementation.html
----------------------------------------------------------------------
diff --git a/0100/implementation.html b/0100/implementation.html
index 0a36c22..16ba07a 100644
--- a/0100/implementation.html
+++ b/0100/implementation.html
@@ -144,33 +144,37 @@ The network layer is a fairly straight-forward NIO server, and will
not be descr
 </p>
 <h3><a id="messages" href="#messages">5.3 Messages</a></h3>
 <p>
-Messages consist of a fixed-size header and variable length opaque byte array payload. The
header contains a format version and a CRC32 checksum to detect corruption or truncation.
Leaving the payload opaque is the right decision: there is a great deal of progress being
made on serialization libraries right now, and any particular choice is unlikely to be right
for all uses. Needless to say a particular application using Kafka would likely mandate a
particular serialization type as part of its usage. The <code>MessageSet</code>
interface is simply an iterator over messages with specialized methods for bulk reading and
writing to an NIO <code>Channel</code>.
+Messages consist of a fixed-size header, a variable length opaque key byte array and a variable
length opaque value byte array. The header contains the following fields:
+<ul>
+    <li> A CRC32 checksum to detect corruption or truncation. <li/>
+    <li> A format version. </li>
+    <li> An attributes identifier </li>
+    <li> A timestamp </li>
+</ul>
+Leaving the key and value opaque is the right decision: there is a great deal of progress
being made on serialization libraries right now, and any particular choice is unlikely to
be right for all uses. Needless to say a particular application using Kafka would likely mandate
a particular serialization type as part of its usage. The <code>MessageSet</code>
interface is simply an iterator over messages with specialized methods for bulk reading and
writing to an NIO <code>Channel</code>.
 
 <h3><a id="messageformat" href="#messageformat">5.4 Message Format</a></h3>
 
 <pre>
-	/**
-	 * A message. The format of an N byte message is the following:
-	 *
-	 * If magic byte is 0
-	 *
-	 * 1. 1 byte "magic" identifier to allow format changes
-	 *
-	 * 2. 4 byte CRC32 of the payload
-	 *
-	 * 3. N - 5 byte payload
-	 *
-	 * If magic byte is 1
-	 *
-	 * 1. 1 byte "magic" identifier to allow format changes
-	 *
-	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of
the version (e.g. compression enabled, type of codec used)
-	 *
-	 * 3. 4 byte CRC32 of the payload
-	 *
-	 * 4. N - 6 byte payload
-	 *
-	 */
+    /**
+     * 1. 4 byte CRC32 of the message
+     * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
+     * 3. 1 byte "attributes" identifier to allow annotations on the message independent
of the version
+     *    bit 0 ~ 2 : Compression codec.
+     *      0 : no compression
+     *      1 : gzip
+     *      2 : snappy
+     *      3 : lz4
+     *    bit 3 : Timestamp type
+     *      0 : create time
+     *      1 : log append time
+     *    bit 4 ~ 7 : reserved
+     * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
+     * 5. 4 byte key length, containing length K
+     * 6. K byte key
+     * 7. 4 byte payload length, containing length V
+     * 8. V byte payload
+     */
 </pre>
 </p>
 <h3><a id="log" href="#log">5.5 Log</a></h3>
@@ -183,10 +187,16 @@ The exact binary format for messages is versioned and maintained as
a standard i
 <pre>
 On-disk format of a message
 
-message length : 4 bytes (value: 1+4+n)
-"magic" value  : 1 byte
+offset         : 8 bytes 
+message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
 crc            : 4 bytes
-payload        : n bytes
+magic value    : 1 byte
+attributes     : 1 byte
+timestamp      : 8 bytes (Only exists when magic value is greater than zero)
+key length     : 4 bytes
+key            : K bytes
+value length   : 4 bytes
+value          : V bytes
 </pre>
 <p>
 The use of the message offset as the message id is unusual. Our original idea was to use
a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker.
But since a consumer must maintain an ID for each server, the global uniqueness of the GUID
provides no value. Furthermore the complexity of maintaining the mapping from a random id
to an offset requires a heavy weight index structure which must be synchronized with disk,
essentially requiring a full persistent random-access data structure. Thus to simplify the
lookup structure we decided to use a simple per-partition atomic counter which could be coupled
with the partition id and node id to uniquely identify a message; this makes the lookup structure
simpler, though multiple seeks per consumer request are still likely. However once we settled
on a counter, the jump to directly using the offset seemed natural&mdash;both after all
are monotonically increasing integers unique to a partition. Since the offs
 et is hidden from the consumer API this decision is ultimately an implementation detail and
we went with the more efficient approach.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/ops.html
----------------------------------------------------------------------
diff --git a/0100/ops.html b/0100/ops.html
index f64a701..faf5453 100644
--- a/0100/ops.html
+++ b/0100/ops.html
@@ -98,6 +98,17 @@ Since running this command can be tedious you can also configure Kafka
to do thi
     auto.leader.rebalance.enable=true
 </pre>
 
+<h4><a id="basic_ops_racks" href="#basic_ops_racks">Balancing Replicas Across
Racks</a></h4>
+The rack awareness feature spreads replicas of the same partition across different racks.
This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting
the risk of data loss should all the brokers on a rack fail at once. The feature can also
be applied to other broker groupings such as availability zones in EC2.
+<p></p>
+You can specify that a broker belongs to a particular rack by adding a property to the broker
config:
+<pre>   broker.rack=my-rack-id</pre>
+When a topic is <a href="#basic_ops_add_topic">created</a>, <a href="#basic_ops_modify_topic">modified</a>
or replicas are <a href="#basic_ops_cluster_expansion">redistributed</a>, the
rack constraint will be honoured, ensuring replicas span as many racks as they can (a partition
will span min(#racks, replication-factor) different racks).
+<p></p>
+The algorithm used to assign replicas to brokers ensures that the number of leaders per broker
will be constant, regardless of how brokers are distributed across racks. This ensures balanced
throughput.
+<p></p>
+However if racks are assigned different numbers of brokers, the assignment of replicas will
not be even. Racks with fewer brokers will get more replicas, meaning they will use more storage
and put more resources into replication. Hence it is sensible to configure an equal number
of brokers per rack.
+
 <h4><a id="basic_ops_mirror_maker" href="#basic_ops_mirror_maker">Mirroring data
between clusters</a></h4>
 
 We refer to the process of replicating data <i>between</i> Kafka clusters "mirroring"
to avoid confusion with the replication that happens amongst the nodes in a single cluster.
Kafka comes with a tool for mirroring data between Kafka clusters. The tool reads from a source
cluster and writes to a destination cluster, like this:

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/quickstart.html
----------------------------------------------------------------------
diff --git a/0100/quickstart.html b/0100/quickstart.html
index 7a923c6..4d4f7ea 100644
--- a/0100/quickstart.html
+++ b/0100/quickstart.html
@@ -258,15 +258,15 @@ This quickstart example will demonstrate how to run a streaming application
code
 of the <code>WordCountDemo</code> example code (converted to use Java 8 lambda
expressions for easy reading).
 </p>
 <pre>
-KStream<String, Long> wordCounts = textLines
-// Split each text line, by whitespace, into words.
-.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
-// Ensure the words are available as message keys for the next aggregate operation.
-.map((key, value) -> new KeyValue<>(value, value))
-// Count the occurrences of each word (message key).
-.countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts")
-// Convert the resulted aggregate table into another stream.
-.toStream();
+KTable<String, Long> wordCounts = textLines
+    // Split each text line, by whitespace, into words.
+    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
+
+    // Ensure the words are available as record keys for the next aggregate operation.
+    .map((key, value) -> new KeyValue<>(value, value))
+
+    // Count the occurrences of each word (record key) and store the results into a table
named "Counts".
+    .countByKey("Counts")
 </pre>
 
 <p>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/upgrade.html
----------------------------------------------------------------------
diff --git a/0100/upgrade.html b/0100/upgrade.html
index 486954c..d09b9d7 100644
--- a/0100/upgrade.html
+++ b/0100/upgrade.html
@@ -63,6 +63,24 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
     message format version. Otherwise consumers before 0.10.0.0 might break. In particular,
after the message format
     is set to 0.10.0, one should not change it back to an earlier format as it may break
consumers on versions before 0.10.0.0.
 </p>
+<p>
+    <b>Note:</b> Due to the additional timestamp introduced in each message,
producers sending small messages may see a
+    message throughput degradation because of the increased overhead.
+    Likewise, replication now transmits an additional 8 bytes per message.
+    If you're running close to the network capacity of your cluster, it's possible that you'll
overwhelm the network cards
+    and see failures and performance issues due to the overload.
+</p>
+    <b>Note:</b> If you have enabled compression on producers, you may notice
reduced producer throughput and/or
+    lower compression rate on the broker in some cases. When receiving compressed messages,
0.10.0
+    brokers avoid recompressing the messages, which in general reduces the latency and improves
the throughput. In
+    certain cases, however, this may reduce the batching size on the producer, which could
lead to worse throughput. If this
+    happens, users can tune linger.ms and batch.size of the producer for better throughput.
In addition, the producer buffer
+    used for compressing messages with snappy is smaller than the one used by the broker,
which may have a negative
+    impact on the compression ratio for the messages on disk. We intend to make this configurable
in a future Kafka
+    release.
+<p>
+
+</p>
 
 <h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking
changes in 0.10.0.0</a></h5>
 <ul>
@@ -90,6 +108,7 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
 <h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in
0.10.0.0</a></h5>
 
 <ul>
+    <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b>
is available for stream processing on data stored in Kafka topics. This new client library
only works with 0.10.x and upward versioned brokers due to message format changes mentioned
above. For more information please read <a href="#streams_overview">this section</a>.</li>
     <li> The default value of the configuration parameter <code>receive.buffer.bytes</code>
is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code>
to restrict internal topics (such as the consumer offsets topic) from accidentally being included
in regular expression subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate their code
to the Java producer included in the kafka-clients JAR as soon as possible. </li>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/1d077ee0/0100/uses.html
----------------------------------------------------------------------
diff --git a/0100/uses.html b/0100/uses.html
index f769bed..5b97272 100644
--- a/0100/uses.html
+++ b/0100/uses.html
@@ -45,7 +45,7 @@ In comparison to log-centric systems like Scribe or Flume, Kafka offers
equally
 
 <h4><a id="uses_streamprocessing" href="#uses_streamprocessing">Stream Processing</a></h4>
 
-Many users end up doing stage-wise processing of data where data is consumed from topics
of raw data and then aggregated, enriched, or otherwise transformed into new Kafka topics
for further consumption. For example a processing flow for article recommendation might crawl
article content from RSS feeds and publish it to an "articles" topic; further processing might
help normalize or deduplicate this content to a topic of cleaned article content; a final
stage might attempt to match this content to users. This creates a graph of real-time data
flow out of the individual topics. <a href="https://storm.apache.org/">Storm</a>
and <a href="http://samza.apache.org/">Samza</a> are popular frameworks for implementing
these kinds of transformations.
+Many users of Kafka process data in processing pipelines consisting of multiple stages, where
raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed
into new topics for further consumption or follow-up processing. For example, a processing
pipeline for recommending news articles might crawl article content from RSS feeds and publish
it to an "articles" topic; further processing might normalize or deduplicate this content
and published the cleansed article content to a new topic; a final processing stage might
attempt to recommend this content to users. Such processing pipelines create graphs of real-time
data flows based on the individual topics. Starting in 0.10.0.0, a light-weight but powerful
stream processing library called <a href="#streams_overview">Kafka Streams</a>
is available in Apache Kafka to perform such data processing as described above. Apart from
Kafka Streams, alternative open source stream processing tools include <a h
 ref="https://storm.apache.org/">Apache Storm</a> and <a href="http://samza.apache.org/">Apache
Samza</a>.
 
 <h4><a id="uses_eventsourcing" href="#uses_eventsourcing">Event Sourcing</a></h4>
 


Mime
View raw message