kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [49/51] [partial] kafka-site git commit: KAFKA-2425: Initial upload of Kafka documentation to git repository with intent to replace SVN
Date Fri, 02 Oct 2015 19:19:47 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/implementation.html
----------------------------------------------------------------------
diff --git a/08/implementation.html b/08/implementation.html
new file mode 100644
index 0000000..1b709db
--- /dev/null
+++ b/08/implementation.html
@@ -0,0 +1,346 @@
+<h3><a id="apidesign">5.1 API Design</a></h3>
+
+<h4>Producer APIs</h4>
+
+<p>
+The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. 
+<pre>
+class Producer<T> {
+	
+  /* Sends the data, partitioned by key to the topic using either the */
+  /* synchronous or the asynchronous producer */
+  public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; producerData);
+
+  /* Sends a list of data, partitioned by key to the topic using either */
+  /* the synchronous or the asynchronous producer */
+  public void send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt; producerData);
+
+  /* Closes the producer and cleans up */	
+  public void close();
+
+}
+</pre>
+
+The goal is to expose all the producer functionality through a single API to the client.  
+
+The new producer -
+<ul>
+<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - 	
+<p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.DefaultEventHandler</code> serialize and send the data to the appropriate kafka broker partition.
+</p>
+</li>
+<li>handles the serialization of data through a user-specified <code>Encoder</code> - 
+<pre>
+interface Encoder&lt;T&gt; {
+  public Message toMessage(T data);
+}
+</pre>
+<p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
+</li>
+<li>provides zookeeper based automatic broker discovery - 
+<p>
+The zookeeper based broker discovery and load balancing can be used by specifying the zookeeper connection url through the <code>zk.connect</code> config parameter. For some applications, however, the dependence on zookeeper is inappropriate. In that case, the producer can take in a static list of brokers through the <code>broker.list</code> config parameter. Each produce requests gets routed to a random broker partition in this case. If that broker is down, the produce request fails. 
+</p>
+</li>
+<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - 
+<p>
+The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. 
+<pre>
+interface Partitioner&lt;T&gt; {
+   int partition(T key, int numPartitions);
+}
+</pre>
+The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.	
+</p>
+</li>
+</ul>
+</p>
+
+<h4>Consumer APIs</h4>
+<p>
+We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
+</p>
+<p>
+The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
+</p>
+
+<h5>Low-level API</h5>
+<pre>
+class SimpleConsumer {
+	
+  /* Send fetch request to a broker and get back a set of messages. */ 
+  public ByteBufferMessageSet fetch(FetchRequest request);
+
+  /* Send a list of fetch requests to a broker and get back a response set. */ 
+  public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);
+
+  /**
+   * Get a list of valid offsets (up to maxSize) before the given time.
+   * The result is a list of offsets, in descending order.
+   * @param time: time in millisecs,
+   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
+   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
+   */
+  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
+}
+</pre>
+
+The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.
+
+<h5>High-level API</h5>
+<pre>
+
+/* create a connection to the cluster */ 
+ConsumerConnector connector = Consumer.create(consumerConfig);
+
+interface ConsumerConnector {
+	
+  /**
+   * This method is used to get a list of KafkaStreams, which are iterators over
+   * MessageAndMetadata objects from which you can obtain messages and their
+   * associated metadata (currently only topic).
+   *  Input: a map of &lt;topic, #streams&gt;
+   *  Output: a map of &lt;topic, list of message streams&gt;
+   */
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+
+  /**
+   * You can also obtain a list of KafkaStreams, that iterate over messages
+   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
+   * whitelist or a blacklist which is a standard Java regex.)
+   */
+  public List&lt;KafkaStream&gt; createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
+
+  /* Commit the offsets of all messages consumed so far. */
+  public commitOffsets()
+  
+  /* Shut down the connector */
+  public shutdown()
+}
+</pre>
+<p>
+This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
+</p>
+<p>
+The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).
+</p>
+
+<h3><a id="networklayer">5.2 Network Layer</a></h3>
+<p>
+The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the <code>MessageSet</code> interface a <code>writeTo</code> method. This allows the file-backed message set to use the more efficient <code>transferTo</code> implementation instead of an in-process buffered write. The threading model is a single acceptor thread and <i>N</i> processor threads which handle a fixed number of connections each. This design has been pretty thoroughly tested <a href="http://sna-projects.com/blog/2009/08/introducing-the-nio-socketserver-implementation">elsewhere</a> and found to be simple to implement and fast. The protocol is kept quite simple to allow for future implementation of clients in other languages.
+</p>
+<h3><a id="messages">5.3 Messages</a></h3>
+<p>
+Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.
+
+<h3><a id="messageformat">5.4 Message Format</a></h3>
+
+<pre>
+	/** 
+	 * A message. The format of an N byte message is the following: 
+	 * 
+	 * If magic byte is 0 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 4 byte CRC32 of the payload 
+	 * 
+	 * 3. N - 5 byte payload 
+	 * 
+	 * If magic byte is 1 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
+	 * 
+	 * 3. 4 byte CRC32 of the payload 
+	 * 
+	 * 4. N - 6 byte payload 
+	 * 
+	 */
+</pre>
+</p>
+<h3><a id="log">5.5 Log</a></h3>
+<p>
+A log for a topic named "my_topic" with two partitions consists of two directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message length which is followed by the <i>N</i> message bytes. Each message is uniquely identified by a 64-bit integer <i>offset</i> giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly <i>S</i> bytes from the previous file where <i>S</i> is the max log file size given in the configuration.
+</p>
+<p>
+The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transfered between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
+</p>
+<pre>
+On-disk format of a message
+
+message length : 4 bytes (value: 1+4+n) 
+"magic" value  : 1 byte
+crc            : 4 bytes
+payload        : n bytes
+</pre>
+<p>
+The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural&mdash;both after all are monotonically increasing integers unique to a partition. Since the offs
 et is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.
+</p>
+<img src="../images/kafka_log.png">
+<h4>Writes</h4>
+<p>
+The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameter <i>M</i> which gives the number of messages to write before forcing the OS to flush the file to disk, and <i>S</i> which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most <i>M</i> messages or <i>S</i> seconds of data in the event of a system crash.
+</p>
+<h4>Reads</h4>
+<p>
+Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte buffer. <i>S</i> is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it need ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting.
+</p>
+<p>
+The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file.
+</p>
+<p>
+The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existant offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case.
+</p>
+
+<p> The following is the format of the results sent to the consumer.
+
+<pre>
+MessageSetSend (fetch result)
+
+total length     : 4 bytes
+error code       : 2 bytes
+message 1        : x bytes
+...
+message n        : x bytes
+</pre>
+
+<pre>
+MultiMessageSetSend (multiFetch result)
+
+total length       : 4 bytes
+error code         : 2 bytes
+messageSetSend 1
+...
+messageSetSend n
+</pre>
+
+<h4>Deletes</h4>
+<p>
+Data is deleted one log segment at a time. The log manager allows pluggable delete policies to choose which files are eligible for deletion. The current policy deletes any log with a modification time of more than <i>N</i> days ago, though a policy which retained the last <i>N</i> GB could also be useful. To avoid locking reads while still allowing deletes that modify the segment list we use a copy-on-write style segment list implementation that provides consistent views to allow a binary search to proceed on an immutable static snapshot view of the log segments while deletes are progressing.
+</p>
+<h4>Guarantees</h4>
+<p>
+The log provides a configuration parameter <i>M</i> which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset.
+</p>
+<p>
+Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is not written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost).
+</p>
+
+<h3><a id="distributionimpl">5.6 Distribution</a></h3>
+<h4>Zookeeper Directories</h4>
+<p>
+The following gives the zookeeper structures and algorithms used for co-ordination between consumers and brokers.
+</p>
+
+<h4>Notation</h4>
+<p>
+When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a zookeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".
+</p>
+
+<h4>Broker Node Registry</h4>
+<pre>
+/brokers/ids/[0...N] --> host:port (ephemeral node)
+</pre>
+<p>
+This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
+</p>
+<p>
+Since the broker registers itself in zookeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).	
+</p>
+<h4>Broker Topic Registry</h4>
+<pre>
+/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
+</pre>
+
+<p>
+Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.
+</p>
+
+<h4>Consumers and Consumer Groups</h4>
+<p>
+Consumers of topics also register themselves in Zookeeper, in order to balance the consumption of data and track their offsets in each partition for each broker they consume from.
+</p>
+
+<p>
+Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. 
+For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
+</p>
+
+<p>
+The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
+</p>
+
+<h4>Consumer Id Registry</h4>
+<p>
+In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.
+<pre>
+/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
+</pre>
+Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of &lt;topic, #streams&gt;. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.
+</p>
+
+<h4>Consumer Offset Tracking</h4>
+<p>
+Consumers track the maximum offset they have consumed in each partition. This value is stored in a zookeeper directory
+</p>
+<pre>
+/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value ((persistent node)
+</pre>
+
+<h4>Partition Owner registry</h4>
+
+<p>
+Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
+</p>
+
+<pre>
+/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
+</pre>
+
+<h4>Broker node registration</h4>
+
+<p>
+The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
+</p>
+
+<h4>Consumer registration algorithm</h4>
+
+<p>
+When a consumer starts, it does the following:
+<ol>
+   <li> Register itself in the consumer id registry under its group.
+   </li>
+   <li> Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
+   </li>
+   <li> Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.) </li>
+   <li> If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)</li>
+   <li> Force itself to rebalance within in its consumer group.
+   </li>
+</ol>
+</p>
+
+<h4>Consumer rebalancing algorithm</h4>
+<p>
+The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
+</p>
+<p>
+Each consumer does the following during rebalancing:
+</p>
+<pre>
+   1. For each topic T that C<sub>i</sub> subscribes to 
+   2.   let P<sub>T</sub> be all partitions producing topic T
+   3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
+   4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)
+   5.   sort C<sub>G</sub>
+   6.   let i be the index position of C<sub>i</sub> in C<sub>G</sub> and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>)
+   7.   assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub>
+   8.   remove current entries owned by C<sub>i</sub> from the partition owner registry
+   9.   add newly assigned partitions to the partition owner registry
+        (we may need to re-try this until the original partition owner releases its ownership)
+</pre>
+<p>
+When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.
+</p>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/introduction.html
----------------------------------------------------------------------
diff --git a/08/introduction.html b/08/introduction.html
new file mode 100644
index 0000000..485ce8b
--- /dev/null
+++ b/08/introduction.html
@@ -0,0 +1,82 @@
+<h3><a id="introduction">1.1 Introduction</a></h3>
+Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
+<p>
+What does all that mean?
+<p>
+First let's review some basic messaging terminology:
+<ul>
+	<li>Kafka maintains feeds of messages in categories called <i>topics</i>.
+	<li>We'll call processes that publish messages to a Kafka topic <i>producers</i>.
+	<li>We'll call processes that subscribe to topics and process the feed of published messages <i>consumers</i>..
+	<li>Kafka is run as a cluster comprised of one or more servers each of which is called a <i>broker</i>.
+</ul>
+
+So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
+<div style="text-align: center; width: 100%">
+  <img src="../images/producer_consumer.png">
+</div>
+
+Communication between the clients and the servers is done with a simple, high-performance, language agnostic <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">TCP protocol</a>. We provide a java client for Kafka, but clients are available in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">many languages</a>.
+
+<h4>Topics and Logs</h4>
+Let's first dive into the high-level abstraction Kafka provides&mdash;the topic.
+<p>
+A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:
+<div style="text-align: center; width: 100%">
+  <img src="../images/log_anatomy.png">
+</div>
+Each partition is an ordered, immutable sequence of messages that is continually appended to&mdash;a commit log. The messages in the partitions are each assigned a sequential id number called the <i>offset</i> that uniquely identifies each message within the partition.
+<p>
+The Kafka cluster retains all published messages&mdash;whether or not they have been consumed&mdash;for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.
+<p>
+In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.
+<p>
+This combination of features means that Kafka consumers are very cheap&mdash;they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.
+<p>
+The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism&mdash;more on that in a bit. 
+
+<h4>Distribution</h4>
+
+The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
+<p>
+Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
+
+<h4>Producers</h4>
+
+Producers publish data to the topics of their choice. The producer is able to choose which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message). More on the use of partitioning in a second.
+
+<h4>Consumers</h4>
+
+Messaging traditionally has two models: <a href="http://en.wikipedia.org/wiki/Message_queue">queuing</a> and <a href="http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern">publish-subscribe</a>. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer abstraction that generalizes both of these&mdash;the <i>consumer group</i>.
+<p>
+Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
+<p>
+If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
+<p>
+If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers. 
+<p>
+More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is cluster of consumers instead of a single process.
+<p>
+
+<div style="float: right; margin: 20px; width: 500px" class="caption">
+  <img src="../images/consumer-groups.png"><br>
+  A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
+</div>
+<p>
+Kafka has stronger ordering guarantees than a traditional messaging system, too.
+<p>
+A traditional queue retains messages in-order on the server, and if multiple consumers consume from the queue then the server hands out messages in the order they are stored. However, although the server hands out messages in order, the messages are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the messages is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.
+<p>
+Kafka does it better. By having a notion of parallelism&mdash;the partition&mdash;within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.
+<p>
+Kafka only provides a total order over messages <i>within</i> a partition. This combined with the ability to partition data by key is sufficient for the vast majority of applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process.
+
+<h4>Guarantees</h4>
+
+At a high-level Kafka gives the following guarantees:
+<ul>
+  <li>Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
+  <li>A consumer instance sees messages in the order they are stored in the log.
+  <li>For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any messages committed to the log.
+</ul>
+More details on these guarantees are given in the design section of the documentation.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/migration.html
----------------------------------------------------------------------
diff --git a/08/migration.html b/08/migration.html
new file mode 100644
index 0000000..b5d5a82
--- /dev/null
+++ b/08/migration.html
@@ -0,0 +1,17 @@
+<!--#include virtual="../includes/header.html" -->
+<h2>Migrating from 0.7.x to 0.8</h2>
+
+0.8 is our first (and hopefully last) release with a non-backwards-compatible wire protocol, zookeeper layout, and on-disk data format. This was a chance for us to clean up a lot of cruft and start fresh. This means performing a no-downtime upgrade is more painful than normal&mdash;you cannot just swap in the new code in-place.
+
+<h3>Migration Steps</h3>
+
+<ol>
+	<li>Setup a new cluster running 0.8.
+	<li>Use the 0.7 to 0.8 <a href="tools.html">migration tool</a> to mirror data from the 0.7 cluster into the 0.8 cluster.
+	<li>When the 0.8 cluster is fully caught up, redeploy all data <i>consumers</i> running the 0.8 client and reading from the 0.8 cluster.
+	<li>Finally migrate all 0.7 producers to 0.8 client publishing data to the 0.8 cluster.
+	<li>Decomission the 0.7 cluster.
+	<li>Drink.
+</ol>
+
+<!--#include virtual="../includes/footer.html" -->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/ops.html
----------------------------------------------------------------------
diff --git a/08/ops.html b/08/ops.html
new file mode 100644
index 0000000..765bc90
--- /dev/null
+++ b/08/ops.html
@@ -0,0 +1,334 @@
+Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.
+
+<h3><a id="datacenters">6.1 Datacenters</a></h3>
+Some deployments will need to manage a data pipeline that spans multiple datacenters. Our approach to this is to deploy a local Kafka cluster in each datacenter and machines in each location interact only with their local cluster.
+<p>
+For applications that need a global view of all data we use the <a href="/08/tools.html">mirror maker tool</a> to provide clusters which have aggregate data mirrored from all datacenters. These aggregator clusters are used for reads by applications that require this.
+<p>
+Likewise in order to support data load into Hadoop which resides in separate facilities we provide local read-only clusters that mirror the production data centers in the facilities where this data load occurs.
+<p>
+This allows each facility to stand alone and operate even if the inter-datacenter links are unavailable: when this occurs the mirroring falls behind until the link is restored at which time it catches up.
+<p>
+This deployment pattern allows datacenters to act as independent entities and allows us to manage and tune inter-datacenter replication centrally.
+<p>
+This is not the only possible deployment pattern. It is possible to read from or write to a remote Kafka cluster over the WAN though TCP tuning will be necessary for high-latency links.
+<p>
+It is generally not advisable to run a single Kafka cluster that spans multiple datacenters as this will incur very high replication latency both for Kafka writes and Zookeeper writes and neither Kafka nor Zookeeper will remain available if the network partitions.
+
+<h3><a id="config">6.2 Kafka Configuration</a></h3>
+Kafka 0.8 is the version we currently run. We are currently running with replication but with producers acks = 1. 
+<P>
+<h4><a id="serverconfig">Important Server Configurations</a></h4>
+
+The most important server configurations for performance are those that control the disk flush rate. The more often data is flushed to disk, the more "seek-bound" Kafka will be and the lower the throughput. However very low application flush rates can lead to high latency when the flush finally does occur (because of the volume of data that must be flushed). See the section below on application versus OS flush.
+
+<h4><a id="clientconfig">Important Client Configurations</a></h4>
+The most important producer configurations control
+<ul>
+	<li>compression</li>
+	<li>sync vs async production></li>
+	<li>batch size (for async producers)</li>
+</ul>
+The most important consumer configuration is the fetch size.
+<p>
+All configurations are documented in the <a href="#configuration">configuration</a> section.
+<p>
+<h4><a id="prodconfig">A Production Server Config</a></h4>
+Here is our server production server configuration:
+<pre>
+# Replication configurations
+num.replica.fetchers=4
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.high.watermark.checkpoint.interval.ms=5000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+
+# Log configuration
+num.partitions=8
+message.max.bytes=1000000
+auto.create.topics.enable=true
+log.index.interval.bytes=4096
+log.index.size.max.bytes=10485760
+log.retention.hours=168
+log.flush.interval.ms=10000
+log.flush.interval.messages=20000
+log.flush.scheduler.interval.ms=2000
+log.roll.hours=168
+log.retention.check.interval.ms=300000
+log.segment.bytes=1073741824
+
+# ZK configuration
+zk.connection.timeout.ms=6000
+zk.sync.time.ms=2000
+
+# Socket server configuration
+num.io.threads=8
+num.network.threads=8
+socket.request.max.bytes=104857600
+socket.receive.buffer.bytes=1048576
+socket.send.buffer.bytes=1048576
+queued.max.requests=16
+fetch.purgatory.purge.interval.requests=100
+producer.purgatory.purge.interval.requests=100
+</pre>
+
+Our client configuration varies a fair amount between different use cases.
+
+<h3><a id="java">Java Version</a></h3>
+Any version of Java 1.6 or later should work fine, we are using 1.6.0_21.
+
+Here are our command line options:
+<pre>
+java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC 
+     -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark 
+     -XX:CMSInitiatingOccupancyFraction=30 
+     -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution 
+     -Xloggc:logs/gc.log -Djava.awt.headless=true
+     -Dcom.sun.management.jmxremote -classpath &lt;long list of jars&gt; the.actual.Class
+	</pre>
+	
+<h3><a id="hwandos">6.4 Hardware and OS</a></h3>
+We are using dual quad-core Intel Xeon machines with 24GB of memory.
+<p>
+You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.
+<p>
+The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is more better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better).
+
+<h4><a id="os">OS</a></h4>
+Kafka should run well on any unix system and has been tested on Linux and Solaris.
+<p>
+We have seen a few issues running on Windows and Windows is not currently a well supported platform though we would be happy to change that.
+<p>
+You likely don't need to do much OS-level tuning though there are a few things that will help performance. 
+<p>
+Two configurations that may be important:
+<ul>
+	<li>We upped the number of file descriptors since we have lots of topics and lots of connections.
+	<li>We upped the max socket buffer size to enable high-performance data transfer between data centers <a href="http://www.psc.edu/index.php/networking/641-tcp-tune">described here</a>.
+</ul>
+
+<h4><a id="diskandfs">Disks and Filesystem</a></h4>
+We recommend using multiple drives to get good throughput and not sharing the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. As of 0.8 you can either RAID these drives together into a single volume or format and mount each drive as its own directory. Since Kafka has replication the redundancy provided by RAID can also be provided at the application level. This choice has several tradeoffs.
+<p>
+If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
+<p>
+RAID can potentially do better at balancing load between disks (although it doesn't always seem to) because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput and reduces the available disk space.
+<p>
+Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.
+
+<h4><a id="appvsosflush">Application vs. OS Flush Management</a></h4>
+Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the and flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written. There are several choices in this configuration.
+<p>
+Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup.
+<p>
+This frequency of application-level fsyncs has a large impact on both latency and throughput. Setting a large flush interval will improve throughput as the operating system can buffer the many small writes into a single large write. This works effectively even across many partitions all taking simultaneous writes provided enough memory is available for buffering. However doing this may have a significant impact on latency as in many filesystems (including ext2, ext3, and ext4) fsync is an operation which blocks all writes to the file. Because of this allowing lots of data to accumulate and then calling flush can lead to large write latencies as new writes on that partition will be blocked as lots of accumulated data is flushed to disk.
+<p>
+In 0.8 we support replication as a way to ensure that data that is written is durable in the face of server crashes. As a result we allow giving out data to consumers immediately and the flush interval does not impact consumer latency. However we still MUST flush each log segment when the log rolls over to a new segment. So although you can set a relatively lenient flush interval setting no flush interval at all will lead to a full segment's worth of data being flushed all at once which can be quite slow.
+After 0.8 we improved our recovery procedure which allows us to avoid the blocking fsync when the log rolls. As a result in all releases after 0.8 we recommend using replication and not setting any application level flush settings---relying only on the OS and Kafka's own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported.
+<p>
+In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful.
+
+<h4><a id="linuxflush">Understanding Linux OS Flush Behavior</a></h4>
+
+In Linux, data written to the filesystem is maintained in <a href="http://en.wikipedia.org/wiki/Page_cache">pagecache</a> until it must be written out to disk (due to an application-level fsync or the OS's own flush policy). The flushing of data is done by a set of background threads called pdflush (or in post 2.6.32 kernels "flusher threads").
+<p>
+Pdflush has a configurable policy that controls how much dirty data can be maintained in cache and for how long before it must be written back to disk. This policy is described <a href="http://www.westnet.com/~gsmith/content/linux-pdflush.htm">here</a>. When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data.
+<p>
+You can see the current state of OS memory usage by doing
+<pre>
+  cat /proc/meminfo
+</pre>
+The meaning of these values are described in the link above.
+<p>
+Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk:
+<ul>
+  <li>The I/O scheduler will batch together consecutive small writes into bigger physical writes which improves throughput.
+  <li>The I/O scheduler will attempt to re-sequence writes to minimize movement of the disk head which improves throughput.
+  <li>It automatically uses all the free memory on the machine
+</ul>
+
+<h4><a id="ext4">Ext4 Notes</a></h4>
+Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS supposedly handle locking during fsync better. We have only tried Ext4, though.
+<p>
+It is not necessary to tune these settings, however those wanting to optimize performance have a few knobs that will help:
+<ul>
+  <li>data=writeback: Ext4 defaults to data=ordered which puts a strong order on some writes. Kafka does not require this ordering as it does very paranoid data recovery on all unflushed log. This setting removes the ordering constraint and seems to significantly reduce latency.
+  <li>Disabling journaling: Journaling is a tradeoff: it makes reboots faster after server crashes but it introduces a great deal of additional locking which adds variance to write performance. Those who don't care about reboot time and want to reduce a major source of write latency spikes can turn off journaling entirely.
+  <li>commit=num_secs: This tunes the frequency with which ext4 commits to its metadata journal. Setting this to a lower value reduces the loss of unflushed data during a crash. Setting this to a higher value will improve throughput.
+  <li>nobh: This setting controls additional ordering guarantees when using data=writeback mode. This should be safe with Kafka as we do not depend on write ordering and improves throughput and latency.
+  <li>delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance.
+</ul>
+	
+<h3><a id="monitoring">6.5 Monitoring</a></h3>
+
+Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
+<p>
+The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.
+<p>
+We pay particular we do graphing and alerting on the following metrics:
+<table class="data-table">
+<tbody><tr>
+      <th>Description</th>
+      <th>Mbean name</th>
+      <th>Normal value</th>
+    </tr>
+    <tr>
+      <td>Message in rate</td>
+      <td>"kafka.server":name="AllTopicsMessagesInPerSec", type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Byte in rate</td>
+      <td>"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Request rate</td>
+      <td>"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Byte out rate</td>
+      <td>"kafka.server":name="AllTopicsBytesOutPerSec", type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Log flush rate and time</td>
+      <td>"kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td># of under replicated partitions (|ISR| &lt |all replicas|)</td>
+      <td>"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager"</td>
+      <td>0</td>
+    </tr>
+    <tr>
+      <td>Is controller active on broker</td>
+      <td>"kafka.controller":name="ActiveControllerCount",type="KafkaController"</td>
+      <td>only one broker in the cluster should have 1</td>
+    </tr>
+    <tr>
+      <td>Leader election rate</td>
+      <td>"kafka.controller":name="LeaderElectionRateAndTimeMs", type="ControllerStats"</td>
+      <td>non-zero when there are broker failures</td>
+    </tr>
+    <tr>
+      <td>Unclean leader election rate</td>
+      <td>"kafka.controller":name="UncleanLeaderElectionsPerSec", type="ControllerStats"</td>
+      <td>0</td>
+    </tr>
+    <tr>
+      <td>Partition counts</td>
+      <td>"kafka.server":name="PartitionCount",type="ReplicaManager"</td>
+      <td>mostly even across brokers</td>
+    </tr>
+    <tr>
+      <td>Leader replica counts</td>
+      <td>"kafka.server":name="LeaderCount",type="ReplicaManager"</td>
+      <td>mostly even across brokers</td>
+    </tr>
+    <tr>
+      <td>ISR shrink rate</td>
+      <td>"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager"</td>
+      <td>If a broker goes down, ISR for some of the partitions will
+	shrink. When that broker is up again, ISR will be expanded
+	once the replicas are fully caught up. Other than that, the
+	expected value for both ISR shrink rate and expansion rate is 0. </td>
+    </tr>
+    <tr>
+      <td>ISR expansion rate</td>
+      <td>"kafka.server":name="ISRExpandsPerSec",type="ReplicaManager"</td>
+      <td>See above</td>
+    </tr>
+    <tr>
+      <td>Max lag in messages btw follower and leader replicas</td>
+      <td>"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager"</td>
+      <td>&lt replica.lag.max.messages</td>
+    </tr>
+    <tr>
+      <td>Lag in messages per follower replica</td>
+      <td>"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics"</td>
+      <td>&lt replica.lag.max.messages</td>
+    </tr>
+    <tr>
+      <td>Requests waiting in the producer purgatory</td>
+      <td>"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"</td>
+      <td>non-zero if ack=-1 is used</td>
+    </tr>
+    <tr>
+      <td>Requests waiting in the fetch purgatory</td>
+      <td>"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"</td>
+      <td>size depends on fetch.wait.max.ms in the consumer</td>
+    </tr>
+    <tr>
+      <td>Request total time</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"</td>
+      <td>broken into queue, local, remote and response send time</td>
+    </tr>
+    <tr>
+      <td>Time the request waiting in the request queue</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Time the request being processed at the leader</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Time the request waits for the follower</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"</td>
+      <td>non-zero for produce requests when ack=-1</td>
+    </tr>
+    <tr>
+      <td>Time to send the response</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Number of messages the consumer lags behind the broker among
+      all partitions consumed</td>
+      <td>"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"</td>
+      <td>small and not growing</td>
+    </tr>
+    <tr>
+      <td>The min fetch rate among all fetchers to brokers in a consumer</td>
+      <td>"kafka.consumer":name="([-.\w]+)-MinFetch",type="ConsumerFetcherManager"</td>
+      <td>&gt= 1000/fetch.wait.max.ms</td>
+    </tr>
+</tbody></table>
+
+We recommend monitor GC time and other stats and various server stats such as CPU utilization, I/O service time, etc.
+
+On the client side, we recommend monitor the message/byte rate (global and per topic), request rate/size/time, and on the consumer side, max lag in messages among all partitions and min fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.
+
+<h4>Audit</h4>
+The final alerting we do is on the correctness of the data delivery. We audit that every message that is sent is consumed by all consumers and measure the lag for this to occur. For important topics we alert if a certain completeness is not achieved in a certain time period. The details of this are discussed in KAFKA-260.
+
+<h3><a id="zk">6.6 Zookeeper</a></h3>
+
+<h4><a id="zkversion">Stable version</a></h4>
+At LinkedIn, we are running Zookeeper 3.3.*. Version 3.3.3 has known serious issues regarding ephemeral node deletion and session expirations. After running into those issues in production, we upgraded to 3.3.4 and have been running that smoothly for over a year now.
+
+<h4><a id="zkops">Operationalizing Zookeeper</a></h4>
+Operationally, we do the following for a healthy Zookeeper installation:
+<p>
+Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc
+<p>
+I/O segregation: if you do a lot of write type traffic you'll almost definitely want the transaction logs on a different disk group than application logs and snapshots (the write to the Zookeeper service has a synchronous write to disk, which can be slow).
+<p>
+Application segregation: Unless you really understand the application patterns of other apps that you want to install on the same box, it can be a good idea to run Zookeeper in isolation (though this can be a balancing act with the capabilities of the hardware).
+Use care with virtualization: It can work, depending on your cluster layout and read/write patterns and SLAs, but the tiny overheads introduced by the virtualization layer can add up and throw off Zookeeper, as it can be very time sensitive
+<p>
+Zookeeper configuration and monitoring: It's java, make sure you give it 'enough' heap space (We usually run them with 3-5G, but that's mostly due to the data set size we have here). Unfortunately we don't have a good formula for it. As far as monitoring, both JMX and the 4 letter words (4lw) commands are very useful, they do overlap in some cases (and in those cases we prefer the 4 letter commands, they seem more predictable, or at the very least, they work better with the LI monitoring infrastructure)
+Don't overbuild the cluster: large clusters, especially in a write heavy usage pattern, means a lot of intracluster communication (quorums on the writes and subsequent cluster member updates), but don't underbuild it (and risk swamping the cluster).
+<p>
+Try to run on a 3-5 node cluster: Zookeeper writes use quorums and inherently that means having an odd number of machines in a cluster. Remember that a 5 node cluster will cause writes to slow down compared to a 3 node cluster, but will allow more fault tolerance.
+<p>
+Overall, we try to keep the Zookeeper system as small as will handle the load (plus standard growth capacity planning) and as simple as possible. We try not to do anything fancy with the configuration or application layout as compared to the official release as well as keep it as self contained as possible. For these reasons, we tend to skip the OS packaged versions, since it has a tendency to try to put things in the OS standard hierarchy, which can be 'messy', for want of a better way to word it.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/quickstart.html
----------------------------------------------------------------------
diff --git a/08/quickstart.html b/08/quickstart.html
new file mode 100644
index 0000000..ac5f45e
--- /dev/null
+++ b/08/quickstart.html
@@ -0,0 +1,164 @@
+<h3><a id="quickstart">1.3 Quick Start</a></h3>
+
+<h4> Step 1: Download the code </h4>
+
+<a href="../downloads.html" title="Kafka downloads">Download</a> the 0.8 release.
+
+<pre>
+<b>&gt; tar xzf kafka-&lt;VERSION&gt;.tgz</b>
+<b>&gt; cd kafka-&lt;VERSION&gt;</b>
+<b>&gt; ./sbt update</b>
+<b>&gt; ./sbt package</b>
+<b>&gt; ./sbt assembly-package-dependency</b>
+</pre>
+
+This tutorial assumes you are starting on a fresh zookeeper instance with no pre-existing data. If you want to migrate from an existing 0.7 installation you will need to follow the migration instructions.
+
+<h4>Step 2: Start the server</h4>
+
+<p>
+Kafka uses zookeeper so you need to first start a zookeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node zookeeper instance.
+
+<pre>
+<b>&gt; bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
+...
+</pre>
+
+Now start the Kafka server:
+<pre>
+<b>&gt; bin/kafka-server-start.sh config/server.properties</b>
+[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
+[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
+...
+</pre>
+
+<h4>Step 3: Create a topic</h4>
+
+Let's create a topic named "test" with a single partition and only one replica:
+<pre>
+&gt; <b>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test</b>
+</pre>
+
+We can now see that topic if we run the list topic command:
+<pre>
+&gt; <b>bin/kafka-list-topic.sh --zookeeper localhost:2181</b>
+</pre>
+Alternatively, you can also configure your brokers to auto-create topics when a non-existent topic is published to.
+
+<h4>Step 4: Send some messages</h4>
+
+Kafka comes with a command line client that will take input from a file or standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.
+<p>
+Run the producer and then type a few messages to send to the server.
+
+<pre>
+&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test</b> 
+This is a message
+This is another message
+</pre>
+
+<h4>Step 5: Start a consumer</h4>
+
+Kafka also has a command line consumer that will dump out messages to standard out.
+
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b>
+This is a message
+This is another message
+</pre>
+<p>
+If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
+</p>
+<p>
+All the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.	
+</p>
+
+<h4>Step 6: Setting up a multi-broker cluster</h4>
+
+So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).
+<p>
+First we make a config file for each of the brokers:
+<pre>
+<b>&gt; cp config/server.properties config/server-1.properties 
+&gt; cp config/server.properties config/server-2.properties</b>
+</pre>
+
+Now edit these new files and set the following properties:
+<pre>
+ 
+config/server-1.properties:
+    broker.id=1
+    port=9093
+    log.dir=/tmp/kafka-logs-1
+ 
+config/server-2.properties:
+    broker.id=2
+    port=9094
+    log.dir=/tmp/kafka-logs-2
+</pre>
+The <code>broker.id</code> property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from trying to all register on the same port or overwrite each others data.
+<p>
+We already have Zookeeper and our single node started, so we just need to start the two new nodes. However, this time we have to override the JMX port used by java too to avoid clashes with the running node:
+<pre>
+<b>&gt; JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &amp;</b>
+...
+<b>&gt; JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &amp;</b>
+...
+</pre>
+
+Now create a new topic with a replication factor of three:
+<pre>
+&gt; <b>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 3 --partition 1 --topic my-replicated-topic</b>
+</pre>
+
+Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "list topics" command:
+<pre>
+&gt; <b>bin/kafka-list-topic.sh --zookeeper localhost:2181</b>
+topic: my-replicated-topic  partition: 0  leader: 1  replicas: 1,2,0  isr: 1,2,0
+topic: test	                partition: 0  leader: 0  replicas: 0      isr: 0
+</pre>
+Here is an explanation of output:
+<ul>
+  <li>"leader" is the node responsible for all reads and writes for the given partition. Each node would be the leader for a randomly selected portion of the partitions.
+  <li>"replicas" is the list of nodes that are supposed to server the log for this partition regardless of whether they are currently alive.
+  <li>"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
+</ul> 
+Note that both topics we created have only a single partition (partition 0). The original topic has no replicas and so it is only present on the leader (node 0), the replicated topic is present on all three nodes with node 1 currently acting as leader and all replicas in sync.
+<p>
+As before let's publish a few messages message:
+<pre>
+&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic</b>
+...
+<b>my test message 1</b>
+<b>my test message 2</b>
+<b>^C</b> 
+</pre>
+Now consume this message:
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic</b>
+...
+my test message 1
+my test message 2
+<b>^C</b>
+</pre>
+
+Now let's test out fault-tolerance. Kill the broker acting as leader for this topic's only partition:
+<pre>
+&gt; <b>pkill -9 -f server-1.properties</b>
+</pre>
+Leadership should switch to one of the slaves:
+<pre>
+&gt; <b>bin/kafka-list-topic.sh --zookeeper localhost:2181</b>
+...
+topic: my-replicated-topic	partition: 0	leader: 2	replicas: 1,2,0	isr: 2
+topic: test	partition: 0	leader: 0	replicas: 0	isr: 0
+</pre>
+And the messages should still be available for consumption even though the leader that took the writes originally is down:
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic</b>
+...
+my test message 1
+my test message 2
+<b>^C</b>
+</pre>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/tools.html
----------------------------------------------------------------------
diff --git a/08/tools.html b/08/tools.html
new file mode 100644
index 0000000..33ea357
--- /dev/null
+++ b/08/tools.html
@@ -0,0 +1,7 @@
+<h4>Mirroring data between clusters</h4>
+We have a tool that runs a continuous copy between two clusters. The clusters are completely independent and the topology need not match (you can have a different number of brokers and a different number of partitions). Offsets and partitioning are currently not preserved by this tool as it is meant for geographical replication rather than backup.
+
+Documentation <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29">here</a>.
+
+<h4>Administrative tools</h4>
+A set of tools for managing an 0.8 cluster is described in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools">here</a>.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/upgrade.html
----------------------------------------------------------------------
diff --git a/08/upgrade.html b/08/upgrade.html
new file mode 100644
index 0000000..5dc7c6f
--- /dev/null
+++ b/08/upgrade.html
@@ -0,0 +1,2 @@
+<h3><a id="migrationtool">Upgrading from 0.7</a></h3>
+Since 0.8 is not backward compatible with 0.7.x, we provide a tool for migrating data in an 0.7 cluster to an 0.8 cluster. Details of the tool can be found <a href="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">here</a>.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/08/uses.html
----------------------------------------------------------------------
diff --git a/08/uses.html b/08/uses.html
new file mode 100644
index 0000000..9b95347
--- /dev/null
+++ b/08/uses.html
@@ -0,0 +1,29 @@
+<h3><a id="uses">1.2 Use Cases</a></h3>
+
+Here is a description of a few of the popular use cases for Apache Kafka. For an overview of a number of these areas in action, see <a href="http://sites.computer.org/debull/A12june/pipeline.pdf">this paper</a>.
+
+<h4>Messaging</h4>
+
+Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.
+<p>
+In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.
+
+<h4>Website Activity Tracking</h4>
+
+The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.
+<p>
+Activity tracking is often very high volume as many activity messages are generated for each user page view.
+
+<h4>Metrics</h4>
+
+Kafka is often used for operation monitoring data pipelines. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
+
+<h4>Log Aggregation</h4>
+
+Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption.
+
+In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.
+
+<h4>Stream Processing</h4>
+
+Many users end up doing stage-wise processing of data where data is consumed from topics of raw data and then aggregated, enriched, or otherwise transformed into new Kafka topics for further consumption. For example a processing flow for article recommendation might crawl article content from RSS feeds and publish it to an "articles" topic; further processing might help normalize or deduplicate this content to a topic of cleaned article content; a final stage might attempt to match this content to users. This creates a graph of real-time data flow out of the individual topics. The <a href="https://github.com/nathanmarz/storm">Storm</a> framework is one popular way for implementing some of these transformations.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/ba6c994c/081/api.html
----------------------------------------------------------------------
diff --git a/081/api.html b/081/api.html
new file mode 100644
index 0000000..d85d2c0
--- /dev/null
+++ b/081/api.html
@@ -0,0 +1,148 @@
+<h3><a id="producerapi">2.1 Producer API</a></h3>
+<pre>
+/**
+ *  V: type of the message
+ *  K: type of the optional key associated with the message
+ */
+class kafka.javaapi.producer.Producer&lt;K,V&gt; {
+  public Producer(ProducerConfig config);
+
+  /**
+   * Sends the data to a single topic, partitioned by key, using either the
+   * synchronous or the asynchronous producer
+   * @param message the producer data object that encapsulates the topic, key and message data
+   */
+  public void send(KeyedMessage&lt;K,V&gt; message);
+
+  /**
+   * Use this API to send data to multiple topics
+   * @param messages list of producer data objects that encapsulate the topic, key and message data
+   */
+  public void send(List&lt;KeyedMessage&lt;K,V&gt;&gt; messages);
+
+  /**
+   * Close API to close the producer pool connections to all Kafka brokers.
+   */
+  public void close();
+}
+
+</pre>
+You can follow
+<a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example" title="Kafka 0.8 producer example">this example</a> to learn how to use the producer api.
+
+<h3><a id="highlevelconsumerapi">2.2 High Level Consumer API</a></h3>
+<pre>
+class Consumer {
+  /**
+   *  Create a ConsumerConnector
+   *
+   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
+   *                 connection string zookeeper.connect.
+   */
+  public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
+}
+
+/**
+ *  V: type of the message
+ *  K: type of the optional key assciated with the message
+ */
+public interface kafka.javaapi.consumer.ConsumerConnector {
+  /**
+   *  Create a list of message streams of type T for each topic.
+   *
+   *  @param topicCountMap  a map of (topic, #streams) pair
+   *  @param decoder a decoder that converts from Message to T
+   *  @return a map of (topic, list of  KafkaStream) pairs.
+   *          The number of items in the list is #streams. Each stream supports
+   *          an iterator over message/metadata pairs.
+   */
+  public &lt;K,V&gt; Map&lt;String, List&lt;KafkaStream&lt;K,V&gt;&gt;&gt;
+    createMessageStreams(Map&lt;String, Integer&gt; topicCountMap, Decoder&lt;K&gt; keyDecoder, Decoder&lt;V&gt; valueDecoder);
+
+  /**
+   *  Create a list of message streams of type T for each topic, using the default decoder.
+   */
+  public Map&lt;String, List&lt;KafkaStream&lt;byte[], byte[]&gt;&gt;&gt; createMessageStreams(Map&lt;String, Integer&gt; topicCountMap);
+
+  /**
+   *  Create a list of message streams for topics matching a wildcard.
+   *
+   *  @param topicFilter a TopicFilter that specifies which topics to
+   *                    subscribe to (encapsulates a whitelist or a blacklist).
+   *  @param numStreams the number of message streams to return.
+   *  @param keyDecoder a decoder that decodes the message key
+   *  @param valueDecoder a decoder that decodes the message itself
+   *  @return a list of KafkaStream. Each stream supports an
+   *          iterator over its MessageAndMetadata elements.
+   */
+  public &lt;K,V&gt; List&lt;KafkaStream&lt;K,V&gt;&gt;
+    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder&lt;K&gt; keyDecoder, Decoder&lt;V&gt; valueDecoder);
+
+  /**
+   *  Create a list of message streams for topics matching a wildcard, using the default decoder.
+   */
+  public List&lt;KafkaStream&lt;byte[], byte[]&gt;&gt; createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
+
+  /**
+   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
+   */
+  public List&lt;KafkaStream&lt;byte[], byte[]&gt;&gt; createMessageStreamsByFilter(TopicFilter topicFilter);
+
+  /**
+   *  Commit the offsets of all topic/partitions connected by this connector.
+   */
+  public void commitOffsets();
+
+  /**
+   *  Shut down the connector
+   */
+  public void shutdown();
+}
+
+</pre>
+You can follow
+<a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example" title="Kafka 0.8 consumer example">this example</a> to learn how to use the high level consumer api.
+<h3><a id="simpleconsumerapi">2.3 Simple Consumer API</a></h3>
+<pre>
+class kafka.javaapi.consumer.SimpleConsumer {
+  /**
+   *  Fetch a set of messages from a topic.
+   *
+   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
+   *  @return a set of fetched messages
+   */
+  public FetchResponse fetch(kafka.javaapi.FetchRequest request);
+
+  /**
+   *  Fetch metadata for a sequence of topics.
+   *
+   *  @param request specifies the versionId, clientId, sequence of topics.
+   *  @return metadata for each topic in the request.
+   */
+  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
+
+  /**
+   *  Get a list of valid offsets (up to maxSize) before the given time.
+   *
+   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
+   *  @return a [[kafka.javaapi.OffsetResponse]] object.
+   */
+  public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
+
+  /**
+   * Close the SimpleConsumer.
+   */
+  public void close();
+}
+</pre>
+For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in
+<a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example" title="Kafka 0.8 SimpleConsumer example">here</a>.
+
+<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3>
+<p>
+Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
+</p>
+
+<p>
+Usage information on the hadoop consumer can be found <a href="https://github.com/linkedin/camus/">here</a>.
+</p>


Mime
View raw message