kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1574659 [3/5] - in /kafka/site: 08/ 081/ diagrams/ images/
Date Wed, 05 Mar 2014 21:06:37 GMT
Added: kafka/site/081/migration.html
URL: http://svn.apache.org/viewvc/kafka/site/081/migration.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/migration.html (added)
+++ kafka/site/081/migration.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,17 @@
+<!--#include virtual="../includes/header.html" -->
+<h2>Migrating from 0.7.x to 0.8</h2>
+
+0.8 is our first (and hopefully last) release with a non-backwards-compatible wire protocol,
zookeeper layout, and on-disk data format. This was a chance for us to clean up a lot of cruft
and start fresh. This means performing a no-downtime upgrade is more painful than normal&mdash;you
cannot just swap in the new code in-place.
+
+<h3>Migration Steps</h3>
+
+<ol>
+	<li>Setup a new cluster running 0.8.
+	<li>Use the 0.7 to 0.8 <a href="tools.html">migration tool</a> to mirror
data from the 0.7 cluster into the 0.8 cluster.
+	<li>When the 0.8 cluster is fully caught up, redeploy all data <i>consumers</i>
running the 0.8 client and reading from the 0.8 cluster.
+	<li>Finally migrate all 0.7 producers to 0.8 client publishing data to the 0.8 cluster.
+	<li>Decomission the 0.7 cluster.
+	<li>Drink.
+</ol>
+
+<!--#include virtual="../includes/footer.html" -->
\ No newline at end of file

Added: kafka/site/081/ops.html
URL: http://svn.apache.org/viewvc/kafka/site/081/ops.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/ops.html (added)
+++ kafka/site/081/ops.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,329 @@
+Here is some information on actually running Kafka as a production system based on usage
and experience at LinkedIn. Please send us any additional tips you know of.
+
+<h3><a id="datacenters">6.1 Datacenters</a></h3>
+Some deployments will need to manage a data pipeline that spans multiple datacenters. Our
approach to this is to deploy a local Kafka cluster in each datacenter and machines in each
location interact only with their local cluster.
+<p>
+For applications that need a global view of all data we use the <a href="/08/tools.html">mirror
maker tool</a> to provide clusters which have aggregate data mirrored from all datacenters.
These aggregator clusters are used for reads by applications that require this.
+<p>
+Likewise in order to support data load into Hadoop which resides in separate facilities we
provide local read-only clusters that mirror the production data centers in the facilities
where this data load occurs.
+<p>
+This allows each facility to stand alone and operate even if the inter-datacenter links are
unavailable: when this occurs the mirroring falls behind until the link is restored at which
time it catches up.
+<p>
+This deployment pattern allows datacenters to act as independent entities and allows us to
manage and tune inter-datacenter replication centrally.
+<p>
+This is not the only possible deployment pattern. It is possible to read from or write to
a remote Kafka cluster over the WAN though TCP tuning will be necessary for high-latency links.
+<p>
+It is generally not advisable to run a single Kafka cluster that spans multiple datacenters
as this will incur very high replication latency both for Kafka writes and Zookeeper writes
and neither Kafka nor Zookeeper will remain available if the network partitions.
+
+<h3><a id="config">6.2 Kafka Configuration</a></h3>
+Kafka 0.8 is the version we currently run. We are currently running with replication but
with producers acks = 1. 
+<P>
+<h4><a id="serverconfig">Important Server Configurations</a></h4>
+
+The most important server configurations for performance are those that control the disk
flush rate. The more often data is flushed to disk, the more "seek-bound" Kafka will be and
the lower the throughput. However very low application flush rates can lead to high latency
when the flush finally does occur (because of the volume of data that must be flushed). See
the section below on application versus OS flush.
+
+<h4><a id="clientconfig">Important Client Configurations</a></h4>
+The most important producer configurations control
+<ul>
+	<li>compression</li>
+	<li>sync vs async production></li>
+	<li>batch size (for async producers)</li>
+</ul>
+The most important consumer configuration is the fetch size.
+<p>
+All configurations are documented in the <a href="configuration.html">configuration</a>
page.
+<p>
+<h4><a id="prodconfig">A Production Server Config</a></h4>
+Here is our server production server configuration:
+<pre>
+# Replication configurations
+num.replica.fetchers=4
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.high.watermark.checkpoint.interval.ms=5000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+
+# Log configuration
+num.partitions=8
+message.max.bytes=1000000
+auto.create.topics.enable=true
+log.index.interval.bytes=4096
+log.index.size.max.bytes=10485760
+log.retention.hours=168
+log.flush.interval.ms=10000
+log.flush.interval.messages=20000
+log.flush.scheduler.interval.ms=2000
+log.roll.hours=168
+log.cleanup.interval.mins=30
+log.segment.bytes=1073741824
+
+# ZK configuration
+zk.connection.timeout.ms=6000
+zk.sync.time.ms=2000
+
+# Socket server configuration
+num.io.threads=8
+num.network.threads=8
+socket.request.max.bytes=104857600
+socket.receive.buffer.bytes=1048576
+socket.send.buffer.bytes=1048576
+queued.max.requests=16
+fetch.purgatory.purge.interval.requests=100
+producer.purgatory.purge.interval.requests=100
+</pre>
+
+Our client configuration varies a fair amount between different use cases.
+
+<h3><a id="java">Java Version</a></h3>
+Any version of Java 1.6 or later should work fine, we are using 1.6.0_21.
+
+Here are our command line options:
+<pre>
+java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC

+     -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark

+     -XX:CMSInitiatingOccupancyFraction=30 
+     -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution 
+     -Xloggc:logs/gc.log -Djava.awt.headless=true
+     -Dcom.sun.management.jmxremote -classpath &lt;long list of jars&gt; the.actual.Class
+	</pre>
+	
+<h3><a id="hwandos">6.4 Hardware and OS</a></h3>
+We are using dual quad-core Intel Xeon machines with 24GB of memory.
+<p>
+You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope
estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute
your memory need as write_throughput*30.
+<p>
+The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput
is the performance bottleneck, and more disks is more better. Depending on how you configure
flush behavior you may or may not benefit from more expensive disks (if you force flush often
then higher RPM SAS drives may be better).
+
+<h4><a id="os">OS</a></h4>
+Kafka should run well on any unix system and has been tested on Linux and Solaris.
+<p>
+We have seen a few issues running on Windows and Windows is not currently a well supported
platform though we would be happy to change that.
+<p>
+You likely don't need to do much OS-level tuning though there are a few things that will
help performance. 
+<p>
+Two configurations that may be important:
+<ul>
+	<li>We upped the number of file descriptors since we have lots of topics and lots
of connections.
+	<li>We upped the max socket buffer size to enable high-performance data transfer between
data centers <a href="http://www.psc.edu/index.php/networking/641-tcp-tune">described
here</a>.
+</ul>
+
+<h4><a id="diskandfs">Disks and Filesystem</a></h4>
+We recommend using multiple drives to get good throughput and not sharing the same drives
used for Kafka data with application logs or other OS filesystem activity to ensure good latency.
As of 0.8 you can either RAID these drives together into a single volume or format and mount
each drive as its own directory. Since Kafka has replication the redundancy provided by RAID
can also be provided at the application level. This choice has several tradeoffs.
+<p>
+If you configure multiple data directories partitions will be assigned round-robin to data
directories. Each partition will be entirely in one of the data directories. If data is not
well balanced among partitions this can lead to load imbalance between disks.
+<p>
+RAID can potentially do better at balancing load between disks (although it doesn't always
seem to) because it balances load at a lower level. The primary downside of RAID is that it
is usually a big performance hit for write throughput and reduces the available disk space.
+<p>
+Another potential benefit of RAID is the ability to tolerate disk failures. However our experience
has been that rebuilding the RAID array is so I/O intensive that it effectively disables the
server, so this does not provide much real availability improvement.
+
+<h4><a id="appvsosflush">Application vs. OS Flush Management</a></h4>
+Kafka always immediately writes all data to the filesystem and supports the ability to configure
the flush policy that controls when data is forced out of the OS cache and onto disk using
the and flush. This flush policy can be controlled to force data to disk after a period of
time or after a certain number of messages has been written. There are several choices in
this configuration.
+<p>
+Kafka must eventually call fsync to know that data was flushed. When recovering from a crash
for any log segment not known to be fsync'd Kafka will check the integrity of each message
by checking its CRC and also rebuild the accompanying offset index file as part of the recovery
process executed on startup.
+<p>
+Note that durability in Kafka does not require syncing data to disk, as a failed node will
always recover from its replicas.
+<p>
+We recommend using the default flush settings which disable application fsync entirely. This
means relying on the background flush done by the OS and Kafka's own background flush. This
provides the best of all worlds for most uses: no knobs to tune, great throughput and latency,
and full recovery guarantees. We generally feel that the guarantees provided by replication
are stronger than sync to local disk, however the paranoid still may prefer having both and
application level fsync policies are still supported.
+<p>
+The drawback of using application level flush settings are that this is less efficient in
it's disk usage pattern (it gives the OS less leeway to re-order writes) and it can introduce
latency as fsync in most Linux filesystems blocks writes to the file whereas the background
flushing does much more granular page-level locking.
+<p>
+In general you don't need to do any low-level tuning of the filesystem, but in the next few
sections we will go over some of this in case it is useful.
+
+<h4><a id="linuxflush">Understanding Linux OS Flush Behavior</a></h4>
+
+In Linux, data written to the filesystem is maintained in <a href="http://en.wikipedia.org/wiki/Page_cache">pagecache</a>
until it must be written out to disk (due to an application-level fsync or the OS's own flush
policy). The flushing of data is done by a set of background threads called pdflush (or in
post 2.6.32 kernels "flusher threads").
+<p>
+Pdflush has a configurable policy that controls how much dirty data can be maintained in
cache and for how long before it must be written back to disk. This policy is described <a
href="http://www.westnet.com/~gsmith/content/linux-pdflush.htm">here</a>. When Pdflush
cannot keep up with the rate of data being written it will eventually cause the writing process
to block incurring latency in the writes to slow down the accumulation of data.
+<p>
+You can see the current state of OS memory usage by doing
+<pre>
+  cat /proc/meminfo
+</pre>
+The meaning of these values are described in the link above.
+<p>
+Using pagecache has several advantages over an in-process cache for storing data that will
be written out to disk:
+<ul>
+  <li>The I/O scheduler will batch together consecutive small writes into bigger physical
writes which improves throughput.
+  <li>The I/O scheduler will attempt to re-sequence writes to minimize movement of
the disk head which improves throughput.
+  <li>It automatically uses all the free memory on the machine
+</ul>
+
+<h4><a id="ext4">Ext4 Notes</a></h4>
+Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS supposedly handle
locking during fsync better. We have only tried Ext4, though.
+<p>
+It is not necessary to tune these settings, however those wanting to optimize performance
have a few knobs that will help:
+<ul>
+  <li>data=writeback: Ext4 defaults to data=ordered which puts a strong order on some
writes. Kafka does not require this ordering as it does very paranoid data recovery on all
unflushed log. This setting removes the ordering constraint and seems to significantly reduce
latency.
+  <li>Disabling journaling: Journaling is a tradeoff: it makes reboots faster after
server crashes but it introduces a great deal of additional locking which adds variance to
write performance. Those who don't care about reboot time and want to reduce a major source
of write latency spikes can turn off journaling entirely.
+  <li>commit=num_secs: This tunes the frequency with which ext4 commits to its metadata
journal. Setting this to a lower value reduces the loss of unflushed data during a crash.
Setting this to a higher value will improve throughput.
+  <li>nobh: This setting controls additional ordering guarantees when using data=writeback
mode. This should be safe with Kafka as we do not depend on write ordering and improves throughput
and latency.
+  <li>delalloc: Delayed allocation means that the filesystem avoid allocating any blocks
until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller
pages and helps ensure the data is written sequentially. This feature is great for throughput.
It does seem to involve some locking in the filesystem which adds a bit of latency variance.
+</ul>
+	
+<h3><a id="monitoring">6.5 Monitoring</a></h3>
+
+Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can
be configured to report stats using pluggable stats reporters to hook up to your monitoring
system.
+<p>
+The easiest way to see the available metrics to fire up jconsole and point it at a running
kafka client or server; this will all browsing all metrics with JMX.
+<p>
+We pay particular we do graphing and alerting on the following metrics:
+<table class="data-table">
+<tbody><tr>
+      <th>Description</th>
+      <th>Mbean name</th>
+      <th>Normal value</th>
+    </tr>
+    <tr>
+      <td>Message in rate</td>
+      <td>"kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Byte in rate</td>
+      <td>"kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Request rate</td>
+      <td>"kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Byte out rate</td>
+      <td>"kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Log flush rate and time</td>
+      <td>"kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td># of under replicated partitions (|ISR| &lt |all replicas|)</td>
+      <td>"kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager"</td>
+      <td>0</td>
+    </tr>
+    <tr>
+      <td>Is controller active on broker</td>
+      <td>"kafka.controller":name="ActiveControllerCount",type="KafkaController"</td>
+      <td>only one broker in the cluster should have 1</td>
+    </tr>
+    <tr>
+      <td>Leader election rate</td>
+      <td>"kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats"</td>
+      <td>non-zero when there are broker failures</td>
+    </tr>
+    <tr>
+      <td>Unclean leader election rate</td>
+      <td>"kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats"</td>
+      <td>0</td>
+    </tr>
+    <tr>
+      <td>Partition counts</td>
+      <td>"kafka.server":name="PartitionCount",type="ReplicaManager"</td>
+      <td>mostly even across brokers</td>
+    </tr>
+    <tr>
+      <td>Leader replica counts</td>
+      <td>"kafka.server":name="LeaderCount",type="ReplicaManager"</td>
+      <td>mostly even across brokers</td>
+    </tr>
+    <tr>
+      <td>ISR shrink rate</td>
+      <td>"kafka.server":name="ISRShrinksPerSec",type="ReplicaManager"</td>
+      <td>If a broker goes down, ISR for some of the partitions will
+	shrink. When that broker is up again, ISR will be expanded
+	once the replicas are fully caught up. Other than that, the
+	expected value for both ISR shrink rate and expansion rate is 0. </td>
+    </tr>
+    <tr>
+      <td>ISR expansion rate</td>
+      <td>"kafka.server":name="ISRExpandsPerSec",type="ReplicaManager"</td>
+      <td>See above</td>
+    </tr>
+    <tr>
+      <td>Max lag in messages btw follower and leader replicas</td>
+      <td>"kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager"</td>
+      <td>&lt replica.lag.max.messages</td>
+    </tr>
+    <tr>
+      <td>Lag in messages per follower replica</td>
+      <td>"kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics"</td>
+      <td>&lt replica.lag.max.messages</td>
+    </tr>
+    <tr>
+      <td>Requests waiting in the producer purgatory</td>
+      <td>"kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"</td>
+      <td>non-zero if ack=-1 is used</td>
+    </tr>
+    <tr>
+      <td>Requests waiting in the fetch purgatory</td>
+      <td>"kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"</td>
+      <td>size depends on fetch.wait.max.ms in the consumer</td>
+    </tr>
+    <tr>
+      <td>Request total time</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"</td>
+      <td>broken into queue, local, remote and response send time</td>
+    </tr>
+    <tr>
+      <td>Time the request waiting in the request queue</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Time the request being processed at the leader</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Time the request waits for the follower</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"</td>
+      <td>non-zero for produce requests when ack=-1</td>
+    </tr>
+    <tr>
+      <td>Time to send the response</td>
+      <td>"kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"</td>
+      <td></td>
+    </tr>
+    <tr>
+      <td>Number of messages the consumer lags behind the producer by</td>
+      <td>"kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"</td>
+      <td></td>
+    </tr>
+</tbody></table>
+
+We recommend monitor GC time and other stats and various server stats such as CPU utilization,
I/O service time, etc.
+
+On the client side, we recommend monitor the message/byte rate (global and per topic), request
rate/size/time, and on the consumer side, max lag in messages among all partitions and min
fetch request rate. For a consumer to keep up, max lag needs to be less than a threshold and
min fetch rate needs to be larger than 0.
+
+<h4>Audit</h4>
+The final alerting we do is on the correctness of the data delivery. We audit that every
message that is sent is consumed by all consumers and measure the lag for this to occur. For
important topics we alert if a certain completeness is not achieved in a certain time period.
The details of this are discussed in KAFKA-260.
+
+<h3><a id="zk">6.6 Zookeeper</a></h3>
+
+<h4><a id="zkversion">Stable version</a></h4>
+At LinkedIn, we are running Zookeeper 3.3.*. Version 3.3.3 has known serious issues regarding
ephemeral node deletion and session expirations. After running into those issues in production,
we upgraded to 3.3.4 and have been running that smoothly for over a year now.
+
+<h4><a id="zkops">Operationalizing Zookeeper</a></h4>
+Operationally, we do the following for a healthy Zookeeper installation:
+<p>
+Redundancy in the physical/hardware/network layout: try not to put them all in the same rack,
decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc
+<p>
+I/O segregation: if you do a lot of write type traffic you'll almost definitely want the
transaction logs on a different disk group than application logs and snapshots (the write
to the Zookeeper service has a synchronous write to disk, which can be slow).
+<p>
+Application segregation: Unless you really understand the application patterns of other apps
that you want to install on the same box, it can be a good idea to run Zookeeper in isolation
(though this can be a balancing act with the capabilities of the hardware).
+Use care with virtualization: It can work, depending on your cluster layout and read/write
patterns and SLAs, but the tiny overheads introduced by the virtualization layer can add up
and throw off Zookeeper, as it can be very time sensitive
+<p>
+Zookeeper configuration and monitoring: It's java, make sure you give it 'enough' heap space
(We usually run them with 3-5G, but that's mostly due to the data set size we have here).
Unfortunately we don't have a good formula for it. As far as monitoring, both JMZ and the
4 letter commands are very useful, they do overlap in some cases (and in those cases we prefer
the 4 letter commands, they seem more predictable, or at the very least, they work better
with the LI monitoring infrastructure)
+Don't overbuild the cluster: large clusters, especially in a write heavy usage pattern, means
a lot of intracluster communication (quorums on the writes and subsequent cluster member updates),
but don't underbuild it (and risk swamping the cluster).
+<p>
+Try to run on a 3-5 node cluster: Zookeeper writes use quorums and inherently that means
having an odd number of machines in a cluster. Remember that a 5 node cluster will cause writes
to slow down compared to a 3 node cluster, but will allow more fault tolerance.
+<p>
+Overall, we try to keep the Zookeeper system as small as will handle the load (plus standard
growth capacity planning) and as simple as possible. We try not to do anything fancy with
the configuration or application layout as compared to the official release as well as keep
it as self contained as possible. For these reasons, we tend to skip the OS packaged versions,
since it has a tendency to try to put things in the OS standard hierarchy, which can be 'messy',
for want of a better way to word it.

Added: kafka/site/081/quickstart.html
URL: http://svn.apache.org/viewvc/kafka/site/081/quickstart.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/quickstart.html (added)
+++ kafka/site/081/quickstart.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,175 @@
+<h3><a id="quickstart">1.3 Quick Start</a></h3>
+
+<h4> Step 1: Download the code </h4>
+
+<a href="../downloads.html" title="Kafka downloads">Download</a> the 0.8 release.
+
+<pre>
+<b>&gt; tar xzf kafka-&lt;VERSION&gt;.tgz</b>
+<b>&gt; cd kafka-&lt;VERSION&gt;</b>
+<b>&gt; ./sbt update</b>
+<b>&gt; ./sbt package</b>
+<b>&gt; ./sbt assembly-package-dependency</b>
+</pre>
+
+This tutorial assumes you are starting on a fresh ZooKeeper instance and Kafka server with
no pre-existing data.
+
+<h4>Step 2: Start the server</h4>
+
+<p>
+Kafka uses zookeeper so you need to first start a zookeeper server if you don't already have
one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node
zookeeper instance.
+
+<pre>
+<b>&gt; bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
+...
+</pre>
+
+Now start the Kafka server:
+<pre>
+<b>&gt; bin/kafka-server-start.sh config/server.properties</b>
+[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
+[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576
(kafka.utils.VerifiableProperties)
+...
+</pre>
+
+<h4>Step 3: Create a topic</h4>
+
+Let's create a topic named "test" with a single partition and only one replica:
+<pre>
+&gt; <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic test</b>
+</pre>
+
+We can now see that topic if we run the list topic command:
+<pre>
+&gt; <b>bin/kafka-topics.sh --list --zookeeper localhost:2181</b>
+test
+</pre>
+Alternatively, instead of manually creating topics you can also configure your brokers to
auto-create topics when a non-existent topic is published to.
+
+<h4>Step 4: Send some messages</h4>
+
+Kafka comes with a command line client that will take input from a file or from standard
input and send it out as messages to the Kafka cluster. By default each line will be sent
as a separate message.
+<p>
+Run the producer and then type a few messages into the console to send to the server.
+
+<pre>
+&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test</b>

+This is a message
+This is another message
+</pre>
+
+<h4>Step 5: Start a consumer</h4>
+
+Kafka also has a command line consumer that will dump out messages to standard output.
+
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b>
+This is a message
+This is another message
+</pre>
+<p>
+If you have each of the above commands running in a different terminal then you should now
be able to type messages into the producer terminal and see them appear in the consumer terminal.
+</p>
+<p>
+All of the command line tools have additional options; running the command with no arguments
will display usage information documenting them in more detail.	
+</p>
+
+<h4>Step 6: Setting up a multi-broker cluster</h4>
+
+So far we have been running against a single broker, but that's no fun. For Kafka, a single
broker is just a cluster of size one, so nothing much changes other than starting a few more
broker instances. But just to get feel for it, let's expand our cluster to three nodes (still
all on our local machine).
+<p>
+First we make a config file for each of the brokers:
+<pre>
+<b>&gt; cp config/server.properties config/server-1.properties 
+&gt; cp config/server.properties config/server-2.properties</b>
+</pre>
+
+Now edit these new files and set the following properties:
+<pre>
+ 
+config/server-1.properties:
+    broker.id=1
+    port=9093
+    log.dir=/tmp/kafka-logs-1
+ 
+config/server-2.properties:
+    broker.id=2
+    port=9094
+    log.dir=/tmp/kafka-logs-2
+</pre>
+The <code>broker.id</code> property is the unique and permanent name of each
node in the cluster. We have to override the port and log directory only because we are running
these all on the same machine and we want to keep the brokers from all trying to register
on the same port or overwrite each others data.
+<p>
+We already have Zookeeper and our single node started, so we just need to start the two new
nodes:
+<pre>
+<b>&gt; bin/kafka-server-start.sh config/server-1.properties &amp;</b>
+...
+<b>&gt; bin/kafka-server-start.sh config/server-2.properties &amp;</b>
+...
+</pre>
+
+Now create a new topic with a replication factor of three:
+<pre>
+&gt; <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
3 --partitions 1 --topic my-replicated-topic</b>
+</pre>
+
+Okay but now that we have a cluster how can we know which broker is doing what? To see that
run the "describe topics" command:
+<pre>
+&gt; <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic</b>
+Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
+	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
+</pre>
+Here is an explanation of output. The first line gives a summary of all the partitions, each
additional line gives information about one partition. Since we have only two partitions for
this topic there are only two lines.
+<ul>
+  <li>"leader" is the node responsible for all reads and writes for the given partition.
Each node will be the leader for a randomly selected portion of the partitions.
+  <li>"replicas" is the list of nodes that replicate the log for this partition regardless
of whether they are the leader or even if they are currently alive.
+  <li>"isr" is the set of "in-sync" replicas. This is the subset of the replicas list
that is currently alive and caught-up to the leader.
+</ul> 
+Note that in my example node 1 is the leader for the only partition of the topic.
+<p>
+We can run the same command on the original topic we created to see where it is:
+<pre>
+&gt; <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test</b>
+Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
+	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
+</pre>
+So there is no surprise there&mdash;the original topic has no replicas and is on server
0, the only server in our cluster when we created it.
+<p>
+Let's publish a few messages to our new topic:
+<pre>
+&gt; <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic</b>
+...
+<b>my test message 1</b>
+<b>my test message 2</b>
+<b>^C</b> 
+</pre>
+Now consume this message:
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
--topic my-replicated-topic</b>
+...
+my test message 1
+my test message 2
+<b>^C</b>
+</pre>
+
+Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:
+<pre>
+&gt; <b>ps | grep server-1.properties</b>
+<i>7564</i> ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
+&gt; <b>kill -9 7564</b>
+</pre>
+
+Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica
set:
+<pre>
+&gt; <b>bin/kafktopics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic</b>
+Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
+	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0
+</pre>
+But the messages are still be available for consumption even though the leader that took
the writes originally is down:
+<pre>
+<b>&gt; bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
--topic my-replicated-topic</b>
+...
+my test message 1
+my test message 2
+<b>^C</b>
+</pre>
\ No newline at end of file

Added: kafka/site/081/tools.html
URL: http://svn.apache.org/viewvc/kafka/site/081/tools.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/tools.html (added)
+++ kafka/site/081/tools.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,8 @@
+<h4>Mirroring data between clusters</h4>
+
+We have a tool that runs a continuous copy between two clusters. The clusters are completely
independent and the topology need not match (you can have a different number of brokers and
a different number of partitions). Offsets and partitioning are currently not preserved by
this tool as it is meant for geographical replication rather than backup.
+
+Documentation <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+%28MirrorMaker%29">here</a>.
+
+<h4>Administrative tools</h4>
+A set of tools for managing an 0.8 cluster is described in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools">here</a>.

Added: kafka/site/081/upgrade.html
URL: http://svn.apache.org/viewvc/kafka/site/081/upgrade.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/upgrade.html (added)
+++ kafka/site/081/upgrade.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,8 @@
+<h3><a id="upgrade">Upgrading From Previous Versions</a></h3>
+<h4>Upgrading from 0.7</h4>
+
+0.8, the release in which added replication, was our first backwards-incompatible release.
The upgrade from 0.7 to 0.8.x requires a <a href="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">special
tool</a> for migration. This migration can be done without downtime.
+
+<h4>Upgrading from 0.8 to 0.8.1</h4>
+
+0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time by simply
bringing it down, updating the code, and restarting it.

Added: kafka/site/081/uses.html
URL: http://svn.apache.org/viewvc/kafka/site/081/uses.html?rev=1574659&view=auto
==============================================================================
--- kafka/site/081/uses.html (added)
+++ kafka/site/081/uses.html Wed Mar  5 21:06:36 2014
@@ -0,0 +1,35 @@
+<h3><a id="uses">1.2 Use Cases</a></h3>
+
+Here is a description of a few of the popular use cases for Apache Kafka. For an overview
of a number of these areas in action, see <a href="http://sites.computer.org/debull/A12june/pipeline.pdf">this
paper</a> or <a href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">this
blog post</a>.
+
+<h4>Messaging</h4>
+
+Kafka works well as a replacement for a more traditional message broker. Message brokers
are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed
messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in
partitioning, replication, and fault-tolerance which makes it a good solution for large scale
message processing applications.
+<p>
+In our experience messaging uses are often comparatively low-throughput, but may require
low end-to-end latency and often depend on the strong durability guarantees Kafka provides.
+<p>
+In this domain Kafka is comparable to traditional messaging systems such as <a href="http://activemq.apache.org">ActiveMQ</a>
or <a href="https://www.rabbitmq.com">RabbitMQ</a>.
+
+<h4>Website Activity Tracking</h4>
+
+The original use case for Kafka was to be able to rebuild a user activity tracking pipeline
as a set of real-time publish-subscribe feeds. This means site activity (page views, searches,
or other actions users may take) is published to central topics with one topic per activity
type. These feeds are available for subscription for a range of use cases including real-time
processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems
for offline processing and reporting.
+<p>
+Activity tracking is often very high volume as many activity messages are generated for each
user page view.
+
+<h4>Metrics</h4>
+
+Kafka is often used for operational monitoring data. This involves aggregating statistics
from distributed applications to produce centralized feeds of operational data.
+
+<h4>Log Aggregation</h4>
+
+Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically
collects physical log files off servers and puts them in a central place (a file server or
HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner
abstraction of log or event data as a stream of messages. This allows for lower-latency processing
and easier support for multiple data sources and distributed data consumption.
+
+In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance,
stronger durability guarantees due to replication, and much lower end-to-end latency.
+
+<h4>Stream Processing</h4>
+
+Many users end up doing stage-wise processing of data where data is consumed from topics
of raw data and then aggregated, enriched, or otherwise transformed into new Kafka topics
for further consumption. For example a processing flow for article recommendation might crawl
article content from RSS feeds and publish it to an "articles" topic; further processing might
help normalize or deduplicate this content to a topic of cleaned article content; a final
stage might attempt to match this content to users. This creates a graph of real-time data
flow out of the individual topics. <a href="https://github.com/nathanmarz/storm">Storm</a>
and <a href="http://samza.incubator.apache.org/">Samza</a> are popular frameworks
for implementing these kinds of transformations.
+
+<h4>Commit Log</h4>
+
+Kafka can serve as a kind of external commit-log for a distributed system. The log helps
replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore
their data. The <a href="/documentation.html#compaction">log compaction</a> feature
in Kafka helps support this usage. In this usage Kafka is similar to <a href="http://zookeeper.apache.org/bookkeeper/">Apache
BookKeeper</a> project.
\ No newline at end of file



Mime
View raw message