kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1498030 [1/3] - in /kafka/site: ./ 07/ 07/images/ 08/ diagrams/ images/ includes/
Date Sat, 29 Jun 2013 22:02:35 GMT
Author: jkreps
Date: Sat Jun 29 22:02:35 2013
New Revision: 1498030

URL: http://svn.apache.org/r1498030
Add an introduction page. Fix weird link to migration tool. Split design and implementation
pages and move to top-level nav. Add diagrams.

      - copied unchanged from r1497617, kafka/site/kafka_multidc.graffle
    kafka/site/images/kafka_log.png   (with props)
    kafka/site/images/kafka_multidc.png   (with props)
    kafka/site/images/log_anatomy.png   (with props)
    kafka/site/images/producer_consumer.png   (with props)
    kafka/site/images/tracking_high_level.png   (with props)

Added: kafka/site/design.html
URL: http://svn.apache.org/viewvc/kafka/site/design.html?rev=1498030&view=auto
--- kafka/site/design.html (added)
+++ kafka/site/design.html Sat Jun 29 22:02:35 2013
@@ -0,0 +1,265 @@
+<!--#include virtual="includes/header.html" -->
+<h2>Why we built this</h2>
+Kafka is a distributed, partitioned, replicated, multi-subscriber commit log.
+We built this because we felt that a sufficiently well implemented commit log is a basic
infrastructure primitive that could act as replacement for a number of disparate systems doing
messaging, ETL, log aggregation, and stream processing. Our goal was to be able to have a
single system with sufficient throughput and capabilities to be able to unify these with a
single platform.
+Activity stream data is a normal part of any website for reporting on usage of the site.
Activity data is things like page views, information about what content was shown, searches,
etc. This kind of thing is usually handled by logging the activity out to some kind of file
and then periodically aggregating these files for analysis. Operational data is data about
the performance of servers (CPU, IO usage, request times, service logs, etc) and a variety
of different approaches to aggregating operational data are used.
+In recent years, activity and operational data has become a critical part of the production
features of websites, and a slightly more sophisticated set of infrastructure is needed.
+<h2>Use cases for activity stream and operational data</h2>
+	<li>"News feed" features that broadcast the activity of your friends.</li>
+	<li>Relevance and ranking uses count ratings, votes, or click-through to determine
which of a given set of items is most relevant.</li>
+	<li>Security: Sites need to block abusive crawlers, rate-limit apis, detect spamming
attempts, and maintain other detection and prevention systems that key off site activity.</li>
+	<li>Operational monitoring: Most sites needs some kind of real-time, heads-up monitoring
that can track performance and trigger alerts if something goes wrong.</li>
+	<li>Reporting and Batch processing: It is common to load data into a data warehouse
or Hadoop system for offline analysis and reporting on business activity</li>
+<h2>Characteristics of activity stream data</h2>	
+This high-throughput stream of immutable activity data represents a real computational challenge
as the volume may easily be 10x or 100x larger than the next largest data source on a site.
+<p>Traditional log file aggregation is a respectable and scalable approach to supporting
offline use cases like reporting or batch processing; but is too high latency for real-time
processing and tends to have rather high operational complexity. On the other hand, existing
messaging and queuing systems are okay for real-time and near-real-time use-cases, but handle
large unconsumed queues very poorly often treating persistence as an after thought. This creates
problems for feeding the data to offline systems like Hadoop that may only consume some sources
once per hour or per day. Kafka is intended to be a single queuing platform that can support
both offline and online use cases.
+Kafka supports fairly general messaging semantics. Nothing ties it to activity processing,
though that was our motivating use case.
+The following diagram gives a simplified view of the deployment topology at LinkedIn.
+<img src="images/tracking_high_level.png">
+Note that a single kafka cluster handles all activity data from all different sources. This
provides a single pipeline of data for both online and offline consumers. This tier acts as
a buffer between live activity and asynchronous processing. We also use kafka to replicate
all data to a different datacenter for offline consumption.
+It is not intended that a single Kafka cluster span data centers, but Kafka is intended to
support multi-datacenter data flow topologies. This is done by allowing mirroring or "syncing"
between clusters. This feature is very simple, the mirror cluster simply acts as a consumer
of the source cluster. This means it is possible for a single cluster to join data from many
datacenters into a single location. Here is an example of a possible multi-datacenter topology
aimed at supporting batch loads:
+<img src="images/kafka_multidc.png">
+Note that there is no correspondence between nodes in the two clusters&mdash;they may
be of different sizes, contain different number of nodes, and a single cluster can mirror
any number of source clusters. More details on using the mirroring feature can be found <a
+<h1>Major Design Elements</h1>
+There is a small number of major design decisions that make Kafka different from most other
messaging systems:
+<li>Kafka is designed for persistent messages as the common case</li>
+<li>Throughput rather than features are the primary design constraint</li>
+<li>State about <i>what</i> has been consumed is maintained as part of
the consumer not the server</li>
+<li>Kafka is explicitly distributed. It is assumed that producers, brokers, and consumers
are all spread over multiple machines.</li>
+Each of these decisions will be discussed in more detail below.
+First some basic terminology and concepts.
+<i>Messages</i> are the fundamental unit of communication. Messages are <i>published</i>
to a <i>topic</i> by a <i>producer</i> which means they are physically
sent to a server acting as a <i>broker</i> (probably another machine). Some number
of <i>consumers</i> subscribe to a topic, and each published message is delivered
to all the consumers.
+Kafka is explicitly distributed&mdash;producers, consumers, and brokers can all be run
on a cluster of machines that co-operate as a logical group. This happens fairly naturally
for brokers and producers, but consumers require some particular support. Each consumer process
belongs to a <i>consumer group</i> and each message is delivered to exactly one
process within every consumer group. Hence a consumer group allows many processes or machines
to logically act as a single consumer. The concept of consumer group is very powerful and
can be used to support the semantics of either a <i>queue</i> or <i>topic</i>
as found in JMS. To support <i>queue</i> semantics, we can put all consumers in
a single consumer group, in which case each message will go to a single consumer. To support
<i>topic</i> semantics, each consumer is put in its own consumer group, and then
all consumers will receive each message. A more common case in our own usage is that we have
multiple logical consumer groups, 
 each consisting of a cluster of consuming machines that act as a logical whole. Kafka has
the added benefit in the case of large data that no matter how many consumers a topic has,
a message is stored only a single time.
+<h2>Message Persistence and Caching</h2>
+<h3>Don't fear the filesystem!</h3>
+Kafka relies heavily on the filesystem for storing and caching messages. There is a general
perception that "disks are slow" which makes people skeptical that a persistent structure
can offer competitive performance. In fact disks are both much slower and much faster than
people expect depending on how they are used; and a properly designed disk structure can often
be as fast as the network.
+The key fact about disk performance is that the throughput of hard drives has been diverging
from the latency of a disk seek for the last decade. As a result the performance of linear
writes on a 6 7200rpm SATA RAID-5 array is about 300MB/sec but the performance of random writes
is only about 50k/sec&mdash;a difference of nearly 10000X. These linear reads and writes
are the most predictable of all usage patterns, and hence the one detected and optimized best
by the operating system using read-ahead and write-behind techniques that prefetch data in
large block multiples and group smaller logical writes into large physical writes. A further
discussion of this issue can be found in this <a href="http://deliveryimages.acm.org/10.1145/1570000/1563874/jacobs3.jpg">ACM
Queue article</a>; they actually find that sequential disk access can in some cases
be faster than random memory access!
+To compensate for this performance divergence modern operating systems have become increasingly
aggressive in their use of main memory for disk caching. Any modern OS will happily divert
<i>all</i> free memory to disk caching with little performance penalty when the
memory is reclaimed. All disk reads and writes will go through this unified cache. This feature
cannot easily be turned off without using direct I/O, so even if a process maintains an in-process
cache of the data, this data will likely be duplicated in OS pagecache, effectively storing
everything twice.
+Furthermore we are building on top of the JVM, and anyone who has spent any time with Java
memory usage knows two things:
+	<li>The memory overhead of objects is very high, often doubling the size of the data
stored (or worse).</li>
+	<li>Java garbage collection becomes increasingly sketchy and expensive as the in-heap
data increases.</li>
+As a result of these factors using the filesystem and relying on pagecache is superior to
maintaining an in-memory cache or other structure&mdash;we at least double the available
cache by having automatic access to all free memory, and likely double again by storing a
compact byte structure rather than individual objects. Doing so will result in a cache of
up to 28-30GB on a 32GB machine without GC penalties. Furthermore this cache will stay warm
even if the service is restarted, whereas the in-process cache will need to be rebuilt in
memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely
cold cache (which likely means terrible initial performance). It also greatly simplifies the
code as all logic for maintaining coherency between the cache and filesystem is now in the
OS, which tends to do so more efficiently and more correctly than one-off in-process attempts.
If your disk usage favors linear reads then read-ahead is effectively pre-pop
 ulating this cache with useful data on each disk read.
+This suggests a design which is very simple: rather than maintain as much as possible in-memory
and flush to the filesystem only when necessary, we invert that. All data is immediately written
to a persistent log on the filesystem without any call to flush the data. In effect this just
means that it is transferred into the kernel's pagecache where the OS can flush it later.
Then we add a configuration driven flush policy to allow the user of the system to control
how often data is flushed to the physical disk (every N messages or every M seconds) to put
a bound on the amount of data "at risk" in the event of a hard crash.
+This style of pagecache-centric design is described in an <a href="http://varnish.projects.linpro.no/wiki/ArchitectNotes">article</a>
on the design of Varnish here (along with a healthy helping of arrogance).
+<h3>Constant Time Suffices</h3>
+The persistent data structure used in messaging systems metadata is often a BTree. BTrees
are the most versatile data structure available, and make it possible to support a wide variety
of transactional and non-transactional semantics in the messaging system. They do come with
a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered
essentially equivalent to constant time, but this is not true for disk operations. Disk seeks
come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited.
Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix
very fast cached operations with actual physical disk operations, the observed performance
of tree structures is often superlinear. Furthermore BTrees require a very sophisticated page
or row locking implementation to avoid locking the entire tree on each operation. The implementation
must pay a fairly high price for row-locking or else effectively 
 serialize all reads. Because of the heavy reliance on disk seeks it is not possible to effectively
take advantage of the improvements in drive density, and one is forced to use small (&lt;
100GB) high RPM SAS drives to maintain a sane ratio of data to seek capacity.
+Intuitively a persistent queue could be built on simple reads and appends to files as is
commonly the case with logging solutions. Though this structure would not support the rich
semantics of a BTree implementation, but it has the advantage that all operations are O(1)
and reads do not block writes or each other. This has obvious performance advantages since
the performance is completely decoupled from the data size--one server can now take full advantage
of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance,
these drives often have comparable performance for large reads and writes at 1/3 the price
and 3x the capacity.
+Having access to virtually unlimited disk space without penalty means that we can provide
some features not usually found in a messaging system. For example, in kafka, instead of deleting
a message immediately after consumption, we can retain messages for a relative long period
(say a week).
+<h2>Maximizing Efficiency</h2>
+Our assumption is that the volume of messages is extremely high, indeed it is some multiple
of the total number of page views for the site (since a page view is one of the activities
we process). Furthermore we assume each message published is read at least once (and often
multiple times), hence we optimize for consumption rather than production.
+There are two common causes of inefficiency: too many network requests, and excessive byte
+To encourage efficiency, the APIs are built around a "message set" abstraction that naturally
groups messages. This allows network requests to group messages together and amortize the
overhead of the network roundtrip rather than sending a single message at a time.
+<p>The <code>MessageSet</code> implementation is itself a very thin API
that wraps a byte array or file. Hence there is no separate serialization or deserialization
step required for message processing, message fields are lazily deserialized as needed (or
not deserialized if not needed).
+The message log maintained by the broker is itself just a directory of message sets that
have been written to disk. This abstraction allows a single byte format to be shared by both
the broker and the consumer (and to some degree the producer, though producer messages are
checksumed and validated before being added to the log).
+Maintaining this common format allows optimization of the most important operation: network
transfer of persistent log chunks. Modern unix operating systems offer a highly optimized
code path for transferring data out of pagecache to a socket; in Linux this is done with the
sendfile system call. Java provides access to this system call with the <code>FileChannel.transferTo</code>
+To understand the impact of sendfile, it is important to understand the common data path
for transfer of data from file to socket:
+	<li>The operating system reads data from the disk into pagecache in kernel space</li>
+	<li>The application reads the data from kernel space into a user-space buffer</li>
+	<li>The application writes the data back into kernel space into a socket buffer</li>
+	<li>The operating system copies the data from the socket buffer to the NIC buffer
where it is sent over the network</li>
+This is clearly inefficient, there are four copies, two system calls. Using sendfile, this
re-copying is avoided by allowing the OS to send the data from pagecache to the network directly.
So in this optimized path, only the final copy to the NIC buffer is needed.
+We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization
above, data is copied into pagecache exactly once and reused on each consumption instead of
being stored in memory and copied out to kernel space every time it is read. This allows messages
to be consumed at a rate that approaches the limit of the network connection.
+For more background on the sendfile and zero-copy support in Java, see this <a href="http://www.ibm.com/developerworks/linux/library/j-zerocopy">article</a>
on IBM developerworks.	
+<h2>End-to-end Batch Compression</h2>
+In many cases the bottleneck is actually not CPU but network. This is particularly true for
a data pipeline that needs to send messages across data centers. Of course the user can always
send compressed messages without any support needed from Kafka, but this can lead to very
poor compression ratios as much of the redundancy is due to repetition between messages (e.g.
field names in JSON or user agents in web logs or common string values). Efficient compression
requires compressing multiple messages together rather than compressing each message individually.
Ideally this would be possible in an end-to-end fashion&mdash;that is, data would be compressed
prior to sending by the producer and remain compressed on the server, only being decompressed
by the eventual consumers.
+Kafka supports this by allowing recursive message sets. A batch of messages can be clumped
together compressed and sent to the server in this form. This batch of messages will be delivered
all to the same consumer and will remain in compressed form until it arrives there.
+Kafka supports GZIP and Snappy compression protocols. More details on compression can be
found <a href="https://cwiki.apache.org/confluence/display/KAFKA/Compression">here</a>.
+<h2>Consumer state</h2>
+Keeping track of <i>what</i> has been consumed is one of the key things a messaging
system must provide. It is not intuitive, but recording this state is one of the key performance
points for the system. State tracking requires updating a persistent entity and potentially
causes random accesses. Hence it is likely to be bound by the seek time of the storage system
not the write bandwidth (as described above).
+Most messaging systems keep metadata about what messages have been consumed on the broker.
That is, as a message is handed out to a consumer, the broker records that fact locally. This
is a fairly intuitive choice, and indeed for a single machine server it is not clear where
else it could go. Since the data structure used for storage in many messaging systems scale
poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately
delete it, keeping the data size small.
+What is perhaps not obvious, is that getting the broker and consumer to come into agreement
about what has been consumed is not a trivial problem. If the broker records a message as
<b>consumed</b> immediately every time it is handed out over the network, then
if the consumer fails to process the message (say because it crashes or the request times
out or whatever) then that message will be lost. To solve this problem, many messaging systems
add an acknowledgement feature which means that messages are only marked as <b>sent</b>
not <b>consumed</b> when they are sent; the broker waits for a specific acknowledgement
from the consumer to record the message as <b>consumed</b>. This strategy fixes
the problem of losing messages, but creates new problems. First of all, if the consumer processes
the message but fails before it can send an acknowledgement then the message will be consumed
twice. The second problem is around performance, now the broker must keep multiple states
about every s
 ingle message (first to lock it so it is not given out a second time, and then to mark it
as permanently consumed so that it can be removed). Tricky problems must be dealt with, like
what to do with messages that are sent but never acknowledged.
+<h3>Message delivery semantics</h3>
+So clearly there are multiple possible message delivery guarantees that could be provided:
+  <li>
+	<i>At most once</i>&mdash;this handles the first case described. Messages
are immediately marked as consumed, so they can't be given out twice, but many failure scenarios
may lead to losing messages.
+  </li>
+  <li>
+	<i>At least once</i>&mdash;this is the second case where we guarantee each
message will be delivered at least once, but in failure cases may be delivered twice.
+  </li>
+  <li>
+	<i>Exactly once</i>&mdash;this is what people actually want, each message
is delivered once and only once.
+  </li>
+This problem is heavily studied, and is a variation of the "transaction commit" problem.
Algorithms that provide exactly once semantics exist, two- or three-phase commits and Paxos
variants being examples, but they come with some drawbacks. They typically require multiple
round trips and may have poor guarantees of liveness (they can halt indefinitely). The FLP
result provides some of the fundamental limitations on these algorithms.
+Kafka does two unusual things with respect to metadata. First the stream is partitioned on
the brokers into a set of distinct partitions. The semantic meaning of these partitions is
left up to the producer and the producer specifies which partition a message belongs to. Within
a partition messages are stored in the order in which they arrive at the broker, and will
be given out to consumers in that same order. This means that rather than store metadata for
each message (marking it as consumed, say), we just need to store the "high water mark" for
each combination of consumer, topic, and partition. Hence the total metadata required to summarize
the state of the consumer is actually quite small. In Kafka we refer to this high-water mark
as "the offset" for reasons that will become clear in the implementation section.
+<h3>Consumer state</h3>
+In Kafka, the consumers are responsible for maintaining state information (offset) on what
has been consumed. Typically, the Kafka consumer library writes their state data to zookeeper.
However, it may be beneficial for consumers to write state data into the same datastore where
they are writing the results of their processing.  For example, the consumer may simply be
entering some aggregate value into a centralized transactional OLTP database. In this case
the consumer can store the state of what is consumed in the same transaction as the database
modification. This solves a distributed consensus problem, by removing the distributed part!
A similar trick works for some non-transactional systems as well. A search system can store
its consumer state with its index segments. Though it may provide no durability guarantees,
this means that the index is always in sync with the consumer state: if an unflushed index
segment is lost in a crash, the indexes can always resume consumption from
  the latest checkpointed offset. Likewise our Hadoop load job which does parallel loads from
Kafka, does a similar trick. Individual mappers write the offset of the last consumed message
to HDFS at the end of the map task. If a job fails and gets restarted, each mapper simply
restarts from the offsets stored in HDFS.
+There is a side benefit of this decision. A consumer can deliberately <i>rewind</i>
back to an old offset and re-consume data. This violates the common contract of a queue, but
turns out to be an essential feature for many consumers. For example, if the consumer code
has a bug and is discovered after some messages are consumed, the consumer can re-consume
those messages once the bug is fixed.
+<h3>Push vs. pull</h3>
+A related question is whether consumers should pull data from brokers or brokers should push
data to the subscriber. In this respect Kafka follows a more traditional design, shared by
most messaging systems, where data is pushed to the broker from the producer and pulled from
the broker by the consumer. Some recent systems, such as <a href="http://github.com/facebook/scribe">scribe</a>
and <a href="http://github.com/cloudera/flume">flume</a>, focusing on log aggregation,
follow a very different push based path where each node acts as a broker and data is pushed
downstream. There are pros and cons to both approaches. However a push-based system has difficulty
dealing with diverse consumers as the broker controls the rate at which data is transferred.
The goal, is generally for the consumer to be able to consume at the maximum possible rate;
unfortunately in a push system this means the consumer tends to be overwhelmed when its rate
of consumption falls below the rate of production (a
  denial of service attack, in essence). A pull-based system has the nicer property that the
consumer simply falls behind and catches up when it can. This can be mitigated with some kind
of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the
rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it
seems. Previous attempts at building systems in this fashion led us to go with a more traditional
pull model.
+Kafka is built to be run across a cluster of machines as the common case. There is no central
"master" node. Brokers are peers to each other and can be added and removed at anytime without
any manual configuration changes. Similarly, producers and consumers can be started dynamically
at any time. Each broker registers some metadata (e.g., available topics) in Zookeeper. Producers
and consumers can use Zookeeper to discover topics and to co-ordinate the production and consumption.
The details of producers and consumers will be described below.
+<h3>Automatic producer load balancing</h3>
+Kafka supports client-side load balancing for message producers or use of a dedicated load
balancer to balance TCP connections. A dedicated layer-4 load balancer works by balancing
TCP connections over Kafka brokers. In this configuration all messages from a given producer
go to a single broker. The advantage of using a level-4 load balancer is that each producer
only needs a single TCP connection, and no connection to zookeeper is needed. The disadvantage
is that the balancing is done at the TCP connection level, and hence it may not be well balanced
(if some producers produce many more messages than others, evenly dividing up the connections
per broker may not result in evenly dividing up the messages per broker).
+Client-side zookeeper-based load balancing solves some of these problems. It allows the producer
to dynamically discover new brokers, and balance load on a per-request basis. Likewise it
allows the producer to partition data according to some key instead of randomly, which enables
stickiness on the consumer (e.g. partitioning data consumption by user id). This feature is
called "semantic partitioning", and is described in more detail below.
+The working of the zookeeper-based load balancing is described below. Zookeeper watchers
are registered on the following events&mdash;
+<li>a new broker comes up</li>
+<li>a broker goes down</li>
+<li>a new topic is registered</li>
+<li>a broker gets registered for an existing topic</li>
+Internally, the producer maintains an elastic pool of connections to the brokers, one per
broker. This pool is kept updated to establish/maintain connections to all the live brokers,
through the zookeeper watcher callbacks. When a producer request for a particular topic comes
in, a broker partition is picked by the partitioner (see section on semantic partitioning).
The available producer connection is used from the pool to send the data to the selected broker
+<h3>Asynchronous send</h3>
+Asynchronous non-blocking operations are fundamental to scaling messaging systems. In Kafka,
the producer provides an option to use asynchronous dispatch of produce requests (producer.type=async).
This allows buffering of produce requests in a in-memory queue and batch sends that are triggered
by a time interval or a pre-configured batch size. Since data is typically published from
set of heterogenous machines producing data at variable rates, this asynchronous buffering
helps generate uniform traffic to the brokers, leading to better network utilization and higher
+<h3>Semantic partitioning</h3>
+Consider an application that would like to maintain an aggregation of the number of profile
visitors for each member. It would like to send all profile visit events for a member to a
particular partition and, hence, have all updates for a member to appear in the same stream
for the same consumer thread. The producer has the capability to be able to semantically map
messages to the available kafka nodes and partitions. This allows partitioning the stream
of messages with some semantic partition function based on some key in the message to spread
them over broker machines. The partitioning function can be customized by providing an implementation
of the kafka.producer.Partitioner interface, default being the random partitioner. For the
example above, the key would be member_id and the partitioning function would be hash(member_id)%num_partitions.
+<h2>Support for Hadoop and other batch data load</h2>
+Scalable persistence allows for the possibility of supporting batch data loads that periodically
snapshot data into an offline system for batch processing.  We make use of this for loading
data into our data warehouse and Hadoop clusters.
+Batch processing happens in stages beginning with the data load stage and proceeding in an
acyclic graph of processing and output stages (e.g. as supported <a href="../azkaban">here</a>).
An essential feature of support for this model is the ability to re-run the data load from
a point in time (in case anything goes wrong).
+In the case of Hadoop we parallelize the data load by splitting the load over individual
map tasks, one for each node/topic/partition combination, allowing full parallelism in the
loading. Hadoop provides the task management, and tasks which fail can restart without danger
of duplicate data.
+<!--#include virtual="../includes/footer.html" -->

View raw message