kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [5/9] kafka-site git commit: Add 0.10.1 docs
Date Tue, 04 Oct 2016 21:26:33 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ed0bb0d9/0101/implementation.html
----------------------------------------------------------------------
diff --git a/0101/implementation.html b/0101/implementation.html
new file mode 100644
index 0000000..12846fb
--- /dev/null
+++ b/0101/implementation.html
@@ -0,0 +1,405 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<h3><a id="apidesign" href="#apidesign">5.1 API Design</a></h3>
+
+<h4><a id="impl_producer" href="#impl_producer">Producer APIs</a></h4>
+
+<p>
+The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code>
and <code>kafka.producer.async.AsyncProducer</code>.
+<pre>
+class Producer<T> {
+
+  /* Sends the data, partitioned by key to the topic using either the */
+  /* synchronous or the asynchronous producer */
+  public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; producerData);
+
+  /* Sends a list of data, partitioned by key to the topic using either */
+  /* the synchronous or the asynchronous producer */
+  public void send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt;
producerData);
+
+  /* Closes the producer and cleans up */
+  public void close();
+
+}
+</pre>
+
+The goal is to expose all the producer functionality through a single API to the client.
+
+The Kafka producer
+<ul>
+<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch
of the batched data:
+<p><code>kafka.producer.Producer</code> provides the ability to batch multiple
produce requests (<code>producer.type=async</code>), before serializing and dispatching
them to the appropriate kafka broker partition. The size of the batch can be controlled by
a few config parameters. As events enter a queue, they are buffered in a queue, until either
<code>queue.time</code> or <code>batch.size</code> is reached. A background
thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch
of data and lets the <code>kafka.producer.EventHandler</code> serialize and send
the data to the appropriate kafka broker partition. A custom event handler can be plugged
in through the <code>event.handler</code> config parameter. At various stages
of this producer queue pipeline, it is helpful to be able to inject callbacks, either for
plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing
the <code>kafka.producer.async.CallbackHandler</c
 ode> interface and setting <code>callback.handler</code> config parameter
to that class.
+</p>
+</li>
+<li>handles the serialization of data through a user-specified <code>Encoder</code>:
+<pre>
+interface Encoder&lt;T&gt; {
+  public Message toMessage(T data);
+}
+</pre>
+<p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
+</li>
+<li>provides software load balancing through an optionally user-specified <code>Partitioner</code>:
+<p>
+The routing decision is influenced by the <code>kafka.producer.Partitioner</code>.
+<pre>
+interface Partitioner&lt;T&gt; {
+   int partition(T key, int numPartitions);
+}
+</pre>
+The partition API uses the key and the number of available broker partitions to return a
partition id. This id is used as an index into a sorted list of broker_ids and partitions
to pick a broker partition for the producer request. The default partitioning strategy is
<code>hash(key)%numPartitions</code>. If the key is null, then a random broker
partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code>
config parameter.
+</p>
+</li>
+</ul>
+</p>
+
+<h4><a id="impl_consumer" href="#impl_consumer">Consumer APIs</a></h4>
+<p>
+We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a
single broker and has a close correspondence to the network requests sent to the server. This
API is completely stateless, with the offset being passed in on every request, allowing the
user to maintain this metadata however they choose.
+</p>
+<p>
+The high-level API hides the details of brokers from the consumer and allows consuming off
the cluster of machines without concern for the underlying topology. It also maintains the
state of what has been consumed. The high-level API also provides the ability to subscribe
to topics that match a filter expression (i.e., either a whitelist or a blacklist regular
expression).
+</p>
+
+<h5><a id="impl_lowlevel" href="#impl_lowlevel">Low-level API</a></h5>
+<pre>
+class SimpleConsumer {
+
+  /* Send fetch request to a broker and get back a set of messages. */
+  public ByteBufferMessageSet fetch(FetchRequest request);
+
+  /* Send a list of fetch requests to a broker and get back a response set. */
+  public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);
+
+  /**
+   * Get a list of valid offsets (up to maxSize) before the given time.
+   * The result is a list of offsets, in descending order.
+   * @param time: time in millisecs,
+   *              if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset
available.
+   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
offset available.
+   */
+  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
+}
+</pre>
+
+The low-level API is used to implement the high-level API as well as being used directly
for some of our offline consumers which have particular requirements around maintaining state.
+
+<h5><a id="impl_highlevel" href="#impl_highlevel">High-level API</a></h5>
+<pre>
+
+/* create a connection to the cluster */
+ConsumerConnector connector = Consumer.create(consumerConfig);
+
+interface ConsumerConnector {
+
+  /**
+   * This method is used to get a list of KafkaStreams, which are iterators over
+   * MessageAndMetadata objects from which you can obtain messages and their
+   * associated metadata (currently only topic).
+   *  Input: a map of &lt;topic, #streams&gt;
+   *  Output: a map of &lt;topic, list of message streams&gt;
+   */
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt;
topicCountMap);
+
+  /**
+   * You can also obtain a list of KafkaStreams, that iterate over messages
+   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
+   * whitelist or a blacklist which is a standard Java regex.)
+   */
+  public List&lt;KafkaStream&gt; createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
+
+  /* Commit the offsets of all messages consumed so far. */
+  public commitOffsets()
+
+  /* Shut down the connector */
+  public shutdown()
+}
+</pre>
+<p>
+This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream
represents the stream of messages from one or more partitions on one or more servers. Each
stream is used for single threaded processing, so the client can provide the number of desired
streams in the create call. Thus a stream may represent the merging of multiple server partitions
(to correspond to the number of processing threads), but each partition only goes to one stream.
+</p>
+<p>
+The createMessageStreams call registers the consumer for the topic, which results in rebalancing
the consumer/broker assignment. The API encourages creating many topic streams in a single
call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally)
registers watchers to discover new topics that match its filter. Note that each stream that
createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e.,
if multiple topics are allowed by the filter).
+</p>
+
+<h3><a id="networklayer" href="#networklayer">5.2 Network Layer</a></h3>
+<p>
+The network layer is a fairly straight-forward NIO server, and will not be described in great
detail. The sendfile implementation is done by giving the <code>MessageSet</code>
interface a <code>writeTo</code> method. This allows the file-backed message set
to use the more efficient <code>transferTo</code> implementation instead of an
in-process buffered write. The threading model is a single acceptor thread and <i>N</i>
processor threads which handle a fixed number of connections each. This design has been pretty
thoroughly tested <a href="http://sna-projects.com/blog/2009/08/introducing-the-nio-socketserver-implementation">elsewhere</a>
and found to be simple to implement and fast. The protocol is kept quite simple to allow for
future implementation of clients in other languages.
+</p>
+<h3><a id="messages" href="#messages">5.3 Messages</a></h3>
+<p>
+Messages consist of a fixed-size header, a variable length opaque key byte array and a variable
length opaque value byte array. The header contains the following fields:
+<ul>
+    <li> A CRC32 checksum to detect corruption or truncation. <li/>
+    <li> A format version. </li>
+    <li> An attributes identifier </li>
+    <li> A timestamp </li>
+</ul>
+Leaving the key and value opaque is the right decision: there is a great deal of progress
being made on serialization libraries right now, and any particular choice is unlikely to
be right for all uses. Needless to say a particular application using Kafka would likely mandate
a particular serialization type as part of its usage. The <code>MessageSet</code>
interface is simply an iterator over messages with specialized methods for bulk reading and
writing to an NIO <code>Channel</code>.
+
+<h3><a id="messageformat" href="#messageformat">5.4 Message Format</a></h3>
+
+<pre>
+    /**
+     * 1. 4 byte CRC32 of the message
+     * 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
+     * 3. 1 byte "attributes" identifier to allow annotations on the message independent
of the version
+     *    bit 0 ~ 2 : Compression codec.
+     *      0 : no compression
+     *      1 : gzip
+     *      2 : snappy
+     *      3 : lz4
+     *    bit 3 : Timestamp type
+     *      0 : create time
+     *      1 : log append time
+     *    bit 4 ~ 7 : reserved
+     * 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
+     * 5. 4 byte key length, containing length K
+     * 6. K byte key
+     * 7. 4 byte payload length, containing length V
+     * 8. V byte payload
+     */
+</pre>
+</p>
+<h3><a id="log" href="#log">5.5 Log</a></h3>
+<p>
+A log for a topic named "my_topic" with two partitions consists of two directories (namely
<code>my_topic_0</code> and <code>my_topic_1</code>) populated with
data files containing the messages for that topic. The format of the log files is a sequence
of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message
length which is followed by the <i>N</i> message bytes. Each message is uniquely
identified by a 64-bit integer <i>offset</i> giving the byte position of the start
of this message in the stream of all messages ever sent to that topic on that partition. The
on-disk format of each message is given below. Each log file is named with the offset of the
first message it contains. So the first file created will be 00000000000.kafka, and each additional
file will have an integer name roughly <i>S</i> bytes from the previous file where
<i>S</i> is the max log file size given in the configuration.
+</p>
+<p>
+The exact binary format for messages is versioned and maintained as a standard interface
so message sets can be transferred between producer, broker, and client without recopying
or conversion when desirable. This format is as follows:
+</p>
+<pre>
+On-disk format of a message
+
+offset         : 8 bytes 
+message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
+crc            : 4 bytes
+magic value    : 1 byte
+attributes     : 1 byte
+timestamp      : 8 bytes (Only exists when magic value is greater than zero)
+key length     : 4 bytes
+key            : K bytes
+value length   : 4 bytes
+value          : V bytes
+</pre>
+<p>
+The use of the message offset as the message id is unusual. Our original idea was to use
a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker.
But since a consumer must maintain an ID for each server, the global uniqueness of the GUID
provides no value. Furthermore the complexity of maintaining the mapping from a random id
to an offset requires a heavy weight index structure which must be synchronized with disk,
essentially requiring a full persistent random-access data structure. Thus to simplify the
lookup structure we decided to use a simple per-partition atomic counter which could be coupled
with the partition id and node id to uniquely identify a message; this makes the lookup structure
simpler, though multiple seeks per consumer request are still likely. However once we settled
on a counter, the jump to directly using the offset seemed natural&mdash;both after all
are monotonically increasing integers unique to a partition. Since the offs
 et is hidden from the consumer API this decision is ultimately an implementation detail and
we went with the more efficient approach.
+</p>
+<img src="images/kafka_log.png">
+<h4><a id="impl_writes" href="#impl_writes">Writes</a></h4>
+<p>
+The log allows serial appends which always go to the last file. This file is rolled over
to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration
parameters: <i>M</i>, which gives the number of messages to write before forcing
the OS to flush the file to disk, and <i>S</i>, which gives a number of seconds
after which a flush is forced. This gives a durability guarantee of losing at most <i>M</i>
messages or <i>S</i> seconds of data in the event of a system crash.
+</p>
+<h4><a id="impl_reads" href="#impl_reads">Reads</a></h4>
+<p>
+Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte
max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte
buffer. <i>S</i> is intended to be larger than any single message, but in the
event of an abnormally large message, the read can be retried multiple times, each time doubling
the buffer size, until the message is read successfully. A maximum message and buffer size
can be specified to make the server reject messages larger than some size, and to give a bound
to the client on the maximum it needs to ever read to get a complete message. It is likely
that the read buffer ends with a partial message, this is easily detected by the size delimiting.
+</p>
+<p>
+The actual process of reading from an offset requires first locating the log segment file
in which the data is stored, calculating the file-specific offset from the global offset value,
and then reading from that file offset. The search is done as a simple binary search variation
against an in-memory range maintained for each file.
+</p>
+<p>
+The log provides the capability of getting the most recently written message to allow clients
to start subscribing as of "right now". This is also useful in the case the consumer fails
to consume its data within its SLA-specified number of days. In this case when the client
attempts to consume a non-existent offset it is given an OutOfRangeException and can either
reset itself or fail as appropriate to the use case.
+</p>
+
+<p> The following is the format of the results sent to the consumer.
+
+<pre>
+MessageSetSend (fetch result)
+
+total length     : 4 bytes
+error code       : 2 bytes
+message 1        : x bytes
+...
+message n        : x bytes
+</pre>
+
+<pre>
+MultiMessageSetSend (multiFetch result)
+
+total length       : 4 bytes
+error code         : 2 bytes
+messageSetSend 1
+...
+messageSetSend n
+</pre>
+<h4><a id="impl_deletes" href="#impl_deletes">Deletes</a></h4>
+<p>
+Data is deleted one log segment at a time. The log manager allows pluggable delete policies
to choose which files are eligible for deletion. The current policy deletes any log with a
modification time of more than <i>N</i> days ago, though a policy which retained
the last <i>N</i> GB could also be useful. To avoid locking reads while still
allowing deletes that modify the segment list we use a copy-on-write style segment list implementation
that provides consistent views to allow a binary search to proceed on an immutable static
snapshot view of the log segments while deletes are progressing.
+</p>
+<h4><a id="impl_guarantees" href="#impl_guarantees">Guarantees</a></h4>
+<p>
+The log provides a configuration parameter <i>M</i> which controls the maximum
number of messages that are written before forcing a flush to disk. On startup a log recovery
process is run that iterates over all messages in the newest log segment and verifies that
each message entry is valid. A message entry is valid if the sum of its size and offset are
less than the length of the file AND the CRC32 of the message payload matches the CRC stored
with the message. In the event corruption is detected the log is truncated to the last valid
offset.
+</p>
+<p>
+Note that two kinds of corruption must be handled: truncation in which an unwritten block
is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The
reason for this is that in general the OS makes no guarantee of the write order between the
file inode and the actual block data so in addition to losing written data the file can gain
nonsense data if the inode is updated with a new size but a crash occurs before the block
containing that data is written. The CRC detects this corner case, and prevents it from corrupting
the log (though the unwritten messages are, of course, lost).
+</p>
+
+<h3><a id="distributionimpl" href="#distributionimpl">5.6 Distribution</a></h3>
+<h4><a id="impl_offsettracking" href="#impl_offsettracking">Consumer Offset Tracking</a></h4>
+<p>
+The high-level consumer tracks the maximum offset it has consumed in each partition and periodically
commits its offset vector so that it can resume from those offsets in the event of a restart.
Kafka provides the option to store all the offsets for a given consumer group in a designated
broker (for that group) called the <i>offset manager</i>. i.e., any consumer instance
in that consumer group should send its offset commits and fetches to that offset manager (broker).
The high-level consumer handles this automatically. If you use the simple consumer you will
need to manage offsets manually. This is currently unsupported in the Java simple consumer
which can only commit or fetch offsets in ZooKeeper. If you use the Scala simple consumer
you can discover the offset manager and explicitly commit or fetch offsets to the offset manager.
A consumer can look up its offset manager by issuing a GroupCoordinatorRequest to any Kafka
broker and reading the GroupCoordinatorResponse which will c
 ontain the offset manager. The consumer can then proceed to commit or fetch offsets from
the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover
the offset manager. If you wish to manage your offsets manually, you can take a look at these
<a href="https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka">code
samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest</a>.
+</p>
+
+<p>
+When the offset manager receives an OffsetCommitRequest, it appends the request to a special
<a href="#compaction">compacted</a> Kafka topic named <i>__consumer_offsets</i>.
The offset manager sends a successful offset commit response to the consumer only after all
the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate
within a configurable timeout, the offset commit will fail and the consumer may retry the
commit after backing off. (This is done automatically by the high-level consumer.) The brokers
periodically compact the offsets topic since it only needs to maintain the most recent offset
commit per partition. The offset manager also caches the offsets in an in-memory table in
order to serve offset fetches quickly.
+</p>
+
+<p>
+When the offset manager receives an offset fetch request, it simply returns the last committed
offset vector from the offsets cache. In case the offset manager was just started or if it
just became the offset manager for a new set of consumer groups (by becoming a leader for
a partition of the offsets topic), it may need to load the offsets topic partition into the
cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and
the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically
by the high-level consumer.)
+</p>
+
+<h5><a id="offsetmigration" href="#offsetmigration">Migrating offsets from ZooKeeper
to Kafka</a></h5>
+<p>
+Kafka consumers in earlier releases store their offsets by default in ZooKeeper. It is possible
to migrate these consumers to commit offsets into Kafka by following these steps:
+<ol>
+   <li>Set <code>offsets.storage=kafka</code> and <code>dual.commit.enabled=true</code>
in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are
healthy.
+   </li>
+   <li>Set <code>dual.commit.enabled=false</code> in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are
healthy.
+   </li>
+</ol>
+A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be performed using the
above steps if you set <code>offsets.storage=zookeeper</code>.
+</p>
+
+<h4><a id="impl_zookeeper" href="#impl_zookeeper">ZooKeeper Directories</a></h4>
+<p>
+The following gives the ZooKeeper structures and algorithms used for co-ordination between
consumers and brokers.
+</p>
+
+<h4><a id="impl_zknotation" href="#impl_zknotation">Notation</a></h4>
+<p>
+When an element in a path is denoted [xyz], that means that the value of xyz is not fixed
and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic]
would be a directory named /topics containing a sub-directory for each topic name. Numerical
ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow
-> is used to indicate the contents of a znode. For example /hello -> world would indicate
a znode /hello containing the value "world".
+</p>
+
+<h4><a id="impl_zkbroker" href="#impl_zkbroker">Broker Node Registry</a></h4>
+<pre>
+/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...}
(ephemeral node)
+</pre>
+<p>
+This is a list of all present broker nodes, each of which provides a unique logical broker
id which identifies it to consumers (which must be given as part of its configuration). On
startup, a broker node registers itself by creating a znode with the logical broker id under
/brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different
physical machine without affecting consumers. An attempt to register a broker id that is already
in use (say because two servers are configured with the same broker id) results in an error.
+</p>
+<p>
+Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration
is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers
it is no longer available).
+</p>
+<h4><a id="impl_zktopic" href="#impl_zktopic">Broker Topic Registry</a></h4>
+<pre>
+/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]}
(ephemeral node)
+</pre>
+
+<p>
+Each broker registers itself under the topics it maintains and stores the number of partitions
for that topic.
+</p>
+
+<h4><a id="impl_zkconsumers" href="#impl_zkconsumers">Consumers and Consumer
Groups</a></h4>
+<p>
+Consumers of topics also register themselves in ZooKeeper, in order to coordinate with each
other and balance the consumption of data. Consumers can also store their offsets in ZooKeeper
by setting <code>offsets.storage=zookeeper</code>. However, this offset storage
mechanism will be deprecated in a future release. Therefore, it is recommended to <a href="#offsetmigration">migrate
offsets storage to Kafka</a>.
+</p>
+
+<p>
+Multiple consumers can form a group and jointly consume a single topic. Each consumer in
the same group is given a shared group_id.
+For example if one consumer is your foobar process, which is run across three machines, then
you might assign this group of consumers the id "foobar". This group id is provided in the
configuration of the consumer, and is your way to tell the consumer which group it belongs
to.
+</p>
+
+<p>
+The consumers in a group divide up the partitions as fairly as possible, each partition is
consumed by exactly one consumer in a consumer group.
+</p>
+
+<h4><a id="impl_zkconsumerid" href="#impl_zkconsumerid">Consumer Id Registry</a></h4>
+<p>
+In addition to the group_id which is shared by all consumers in a group, each consumer is
given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes.
Consumer ids are registered in the following directory.
+<pre>
+/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...}
(ephemeral node)
+</pre>
+Each of the consumers in the group registers under its group and creates a znode with its
consumer_id. The value of the znode contains a map of &lt;topic, #streams&gt;. This
id is simply used to identify each of the consumers which is currently active within a group.
This is an ephemeral node so it will disappear if the consumer process dies.
+</p>
+
+<h4><a id="impl_zkconsumeroffsets" href="#impl_zkconsumeroffsets">Consumer Offsets</a></h4>
+<p>
+Consumers track the maximum offset they have consumed in each partition. This value is stored
in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>.
+</p>
+<pre>
+/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent
node)
+</pre>
+
+<h4><a id="impl_zkowner" href="#impl_zkowner">Partition Owner registry</a></h4>
+
+<p>
+Each broker partition is consumed by a single consumer within a given consumer group. The
consumer must establish its ownership of a given partition before any consumption can begin.
To establish its ownership, a consumer writes its own id in an ephemeral node under the particular
broker partition it is claiming.
+</p>
+
+<pre>
+/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
+</pre>
+
+<h4><a id="impl_clusterid" href="#impl_clusterid">Cluster Id</a></h4>
+
+<p>
+    The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The
cluster id can have a maximum of 22 characters and the allowed characters are defined by the
regular expression [a-zA-Z0-9_\-]+, which corresponds to the characters used by the URL-safe
Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started
for the first time.
+</p>
+<p>
+    Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully
started for the first time. The broker tries to get the cluster id from the <code>/cluster/id</code>
znode during startup. If the znode does not exist, the broker generates a new cluster id and
creates the znode with this cluster id.
+</p>
+
+<h4><a id="impl_brokerregistration" href="#impl_brokerregistration">Broker node
registration</a></h4>
+
+<p>
+The broker nodes are basically independent, so they only publish information about what they
have. When a broker joins, it registers itself under the broker node registry directory and
writes information about its host name and port. The broker also register the list of existing
topics and their logical partitions in the broker topic registry. New topics are registered
dynamically when they are created on the broker.
+</p>
+
+<h4><a id="impl_consumerregistration" href="#impl_consumerregistration">Consumer
registration algorithm</a></h4>
+
+<p>
+When a consumer starts, it does the following:
+<ol>
+   <li> Register itself in the consumer id registry under its group.
+   </li>
+   <li> Register a watch on changes (new consumers joining or any existing consumers
leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers
within the group to which the changed consumer belongs.)
+   </li>
+   <li> Register a watch on changes (new brokers joining or any existing brokers leaving)
under the broker id registry. (Each change triggers rebalancing among all consumers in all
consumer groups.) </li>
+   <li> If the consumer creates a message stream using a topic filter, it also registers
a watch on changes (new topics being added) under the broker topic registry. (Each change
will trigger re-evaluation of the available topics to determine which topics are allowed by
the topic filter. A new allowed topic will trigger rebalancing among all consumers within
the consumer group.)</li>
+   <li> Force itself to rebalance within in its consumer group.
+   </li>
+</ol>
+</p>
+
+<h4><a id="impl_consumerrebalance" href="#impl_consumerrebalance">Consumer rebalancing
algorithm</a></h4>
+<p>
+The consumer rebalancing algorithms allows all the consumers in a group to come into consensus
on which consumer is consuming which partitions. Consumer rebalancing is triggered on each
addition or removal of both broker nodes and other consumers within the same group. For a
given topic and a given consumer group, broker partitions are divided evenly among consumers
within the group. A partition is always consumed by a single consumer. This design simplifies
the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers,
there would be contention on the partition and some kind of locking would be required. If
there are more consumers than partitions, some consumers won't get any data at all. During
rebalancing, we try to assign partitions to consumers in such a way that reduces the number
of broker nodes each consumer has to connect to.
+</p>
+<p>
+Each consumer does the following during rebalancing:
+</p>
+<pre>
+   1. For each topic T that C<sub>i</sub> subscribes to
+   2.   let P<sub>T</sub> be all partitions producing topic T
+   3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub>
that consume topic T
+   4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)
+   5.   sort C<sub>G</sub>
+   6.   let i be the index position of C<sub>i</sub> in C<sub>G</sub>
and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>)
+   7.   assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub>
+   8.   remove current entries owned by C<sub>i</sub> from the partition owner
registry
+   9.   add newly assigned partitions to the partition owner registry
+        (we may need to re-try this until the original partition owner releases its ownership)
+</pre>
+<p>
+When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers
within the same group about the same time.
+</p>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ed0bb0d9/0101/introduction.html
----------------------------------------------------------------------
diff --git a/0101/introduction.html b/0101/introduction.html
new file mode 100644
index 0000000..3fbf7a3
--- /dev/null
+++ b/0101/introduction.html
@@ -0,0 +1,99 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<h3><a id="introduction" href="#introduction">1.1 Introduction</a></h3>
+Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality
of a messaging system, but with a unique design.
+<p>
+What does all that mean?
+<p>
+First let's review some basic messaging terminology:
+<ul>
+    <li>Kafka maintains feeds of messages in categories called <i>topics</i>.
+    <li>We'll call processes that publish messages to a Kafka topic <i>producers</i>.
+    <li>We'll call processes that subscribe to topics and process the feed of published
messages <i>consumers</i>.
+    <li>Kafka is run as a cluster comprised of one or more servers each of which is
called a <i>broker</i>.
+</ul>
+
+So, at a high level, producers send messages over the network to the Kafka cluster which
in turn serves them up to consumers like this:
+<div style="text-align: center; width: 100%">
+  <img src="images/producer_consumer.png">
+</div>
+
+Communication between the clients and the servers is done with a simple, high-performance,
language agnostic <a href="https://kafka.apache.org/protocol.html">TCP protocol</a>.
We provide a Java client for Kafka, but clients are available in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">many
languages</a>.
+
+<h4><a id="intro_topics" href="#intro_topics">Topics and Logs</a></h4>
+Let's first dive into the high-level abstraction Kafka provides&mdash;the topic.
+<p>
+A topic is a category or feed name to which messages are published. For each topic, the Kafka
cluster maintains a partitioned log that looks like this:
+<div style="text-align: center; width: 100%">
+  <img src="images/log_anatomy.png">
+</div>
+Each partition is an ordered, immutable sequence of messages that is continually appended
to&mdash;a commit log. The messages in the partitions are each assigned a sequential id
number called the <i>offset</i> that uniquely identifies each message within the
partition.
+<p>
+The Kafka cluster retains all published messages&mdash;whether or not they have been
consumed&mdash;for a configurable period of time. For example if the log retention is
set to two days, then for the two days after a message is published it is available for consumption,
after which it will be discarded to free up space. Kafka's performance is effectively constant
with respect to data size so retaining lots of data is not a problem.
+<p>
+In fact the only metadata retained on a per-consumer basis is the position of the consumer
in the log, called the "offset". This offset is controlled by the consumer: normally a consumer
will advance its offset linearly as it reads messages, but in fact the position is controlled
by the consumer and it can consume messages in any order it likes. For example a consumer
can reset to an older offset to reprocess.
+<p>
+This combination of features means that Kafka consumers are very cheap&mdash;they can
come and go without much impact on the cluster or on other consumers. For example, you can
use our command line tools to "tail" the contents of any topic without changing what is consumed
by any existing consumers.
+<p>
+The partitions in the log serve several purposes. First, they allow the log to scale beyond
a size that will fit on a single server. Each individual partition must fit on the server
that hosts it, but a topic may have many partitions so it can handle an arbitrary amount of
data. Second they act as the unit of parallelism&mdash;more on that in a bit.
+
+<h4><a id="intro_distribution" href="#intro_distribution">Distribution</a></h4>
+
+The partitions of the log are distributed over the servers in the Kafka cluster with each
server handling data and requests for a share of the partitions. Each partition is replicated
across a configurable number of servers for fault tolerance.
+<p>
+Each partition has one server which acts as the "leader" and zero or more servers which act
as "followers". The leader handles all read and write requests for the partition while the
followers passively replicate the leader. If the leader fails, one of the followers will automatically
become the new leader. Each server acts as a leader for some of its partitions and a follower
for others so load is well balanced within the cluster.
+
+<h4><a id="intro_producers" href="#intro_producers">Producers</a></h4>
+
+Producers publish data to the topics of their choice. The producer is responsible for choosing
which message to assign to which partition within the topic. This can be done in a round-robin
fashion simply to balance load or it can be done according to some semantic partition function
(say based on some key in the message). More on the use of partitioning in a second.
+
+<h4><a id="intro_consumers" href="#intro_consumers">Consumers</a></h4>
+
+Messaging traditionally has two models: <a href="http://en.wikipedia.org/wiki/Message_queue">queuing</a>
and <a href="http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern">publish-subscribe</a>.
In a queue, a pool of consumers may read from a server and each message goes to one of them;
in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer
abstraction that generalizes both of these&mdash;the <i>consumer group</i>.
+<p>
+Consumers label themselves with a consumer group name, and each message published to a topic
is delivered to one consumer instance within each subscribing consumer group. Consumer instances
can be in separate processes or on separate machines.
+<p>
+If all the consumer instances have the same consumer group, then this works just like a traditional
queue balancing load over the consumers.
+<p>
+If all the consumer instances have different consumer groups, then this works like publish-subscribe
and all messages are broadcast to all consumers.
+<p>
+More commonly, however, we have found that topics have a small number of consumer groups,
one for each "logical subscriber". Each group is composed of many consumer instances for scalability
and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber
is a cluster of consumers instead of a single process.
+<p>
+
+<div style="float: right; margin: 20px; width: 500px" class="caption">
+  <img src="images/consumer-groups.png"><br>
+  A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer
group A has two consumer instances and group B has four.
+</div>
+<p>
+Kafka has stronger ordering guarantees than a traditional messaging system, too.
+<p>
+A traditional queue retains messages in-order on the server, and if multiple consumers consume
from the queue then the server hands out messages in the order they are stored. However, although
the server hands out messages in order, the messages are delivered asynchronously to consumers,
so they may arrive out of order on different consumers. This effectively means the ordering
of the messages is lost in the presence of parallel consumption. Messaging systems often work
around this by having a notion of "exclusive consumer" that allows only one process to consume
from a queue, but of course this means that there is no parallelism in processing.
+<p>
+Kafka does it better. By having a notion of parallelism&mdash;the partition&mdash;within
the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool
of consumer processes. This is achieved by assigning the partitions in the topic to the consumers
in the consumer group so that each partition is consumed by exactly one consumer in the group.
By doing this we ensure that the consumer is the only reader of that partition and consumes
the data in order. Since there are many partitions this still balances the load over many
consumer instances. Note however that there cannot be more consumer instances in a consumer
group than partitions.
+<p>
+Kafka only provides a total order over messages <i>within</i> a partition, not
between different partitions in a topic. Per-partition ordering combined with the ability
to partition data by key is sufficient for most applications. However, if you require a total
order over messages this can be achieved with a topic that has only one partition, though
this will mean only one consumer process per consumer group.
+
+<h4><a id="intro_guarantees" href="#intro_guarantees">Guarantees</a></h4>
+
+At a high-level Kafka gives the following guarantees:
+<ul>
+  <li>Messages sent by a producer to a particular topic partition will be appended
in the order they are sent. That is, if a message M1 is sent by the same producer as a message
M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the
log.
+  <li>A consumer instance sees messages in the order they are stored in the log.
+  <li>For a topic with replication factor N, we will tolerate up to N-1 server failures
without losing any messages committed to the log.
+</ul>
+More details on these guarantees are given in the design section of the documentation.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ed0bb0d9/0101/migration.html
----------------------------------------------------------------------
diff --git a/0101/migration.html b/0101/migration.html
new file mode 100644
index 0000000..5240d86
--- /dev/null
+++ b/0101/migration.html
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--#include virtual="../includes/header.html" -->
+<h2><a id="migration" href="#migration">Migrating from 0.7.x to 0.8</a></h2>
+
+0.8 is our first (and hopefully last) release with a non-backwards-compatible wire protocol,
ZooKeeper     layout, and on-disk data format. This was a chance for us to clean up a lot
of cruft and start fresh. This means performing a no-downtime upgrade is more painful than
normal&mdash;you cannot just swap in the new code in-place.
+
+<h3><a id="migration_steps" href="#migration_steps">Migration Steps</a></h3>
+
+<ol>
+    <li>Setup a new cluster running 0.8.
+    <li>Use the 0.7 to 0.8 <a href="tools.html">migration tool</a> to mirror
data from the 0.7 cluster into the 0.8 cluster.
+    <li>When the 0.8 cluster is fully caught up, redeploy all data <i>consumers</i>
running the 0.8 client and reading from the 0.8 cluster.
+    <li>Finally migrate all 0.7 producers to 0.8 client publishing data to the 0.8
cluster.
+    <li>Decommission the 0.7 cluster.
+    <li>Drink.
+</ol>
+
+<!--#include virtual="../includes/footer.html" -->


Mime
View raw message