kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1153025 [2/2] - in /incubator/kafka/site: ./ images/ includes/
Date Tue, 02 Aug 2011 04:31:45 GMT
Added: incubator/kafka/site/projects.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/projects.html?rev=1153025&view=auto
==============================================================================
--- incubator/kafka/site/projects.html (added)
+++ incubator/kafka/site/projects.html Tue Aug  2 04:31:40 2011
@@ -0,0 +1,112 @@
+<!--#include virtual="includes/header.html" -->
+
+<h1>Current Work</h1>
+	
+<p>
+  Below is a list of major projects we know people are currently pursuing. If you have thoughts
on these or want to help, please <a href="http://groups.google.com/group/kafka-dev">let
us know</a>.
+</p>
+
+<h3>Improved Stream Processing Libraries</h3>
+
+<p>
+We recently added the rich producer library that allows partitioned message production. This
combined with the partition affinity of the consumers, gives the ability to do partitioned
stream processing. One thing that is not very well developed is the patterns and libraries
to support this. What we have in mind is a scala DSL to make it easy to group, aggregate,
and otherwise transforms these infinite streams.
+</p>
+
+<h3>Replication</h3>
+
+<p>
+Messages are currently written to a single broker with no replication between brokers. We
would like to provide replication between brokers and expose options to the producer to block
until a configurable number of replicas have acknowledged the message to allow the client
to control the fault-tolerance semantics.
+</p>
+
+<h3>Compression</h3>
+
+<p>
+We have a patch that provides end-to-end message set compression from producer to broker
and broker to consumer with no need for intervening decompression. We hope to add this feature
soon.
+</p>
+
+<h1>Project Ideas</h1>
+
+<p>
+Below is a list of projects which would be great to have but haven't yet been started. Ping
the <a href="http://groups.google.com/group/kafka-dev">mailing list</a> if you
are interested in working on any of these.
+</p>
+
+<h3>Clients In Other Languages</h3>
+
+<p>
+We offer a JVM-based client for production and consumption and also a rather primitive native
python client. It would be great to improve this list. The lower-level protocols are well
documented <a href="design.php">here</a> and should be relatively easy to implement
in any language that supports standard socket I/O.
+</p>
+
+<h3>Convert Hadoop InputFormat or OutputFormat to Scala</h3>
+<p>
+We have an Hadoop InputFormat and OutputFormat that were contributed and are in use at LinkedIn.
This code is in Java, though, which means it doesn't quite fit in well with the project. It
would be good to convert this code to Scala to keep things consistent.
+</p>
+
+<h3>Long Poll</h3>
+
+<p>
+The consumer currently uses a simple polling mechanism. The fetch request always returns
immediately, yielding no data if no new messages have arrived, and using a simple backoff
mechanism when there are no new messages to avoid to frequent requests to the broker. This
is efficient enough, but the lowest possible latency of the consumer is given by the polling
frequency. It would be nice to enhance the consumer API to allow an option in the fetch request
to have the server block for a given period of time waiting for data to be available rather
than immediately returning and then waiting to poll again. This would provide somewhat improved
latency in the low-throughput case where the consumer is often waiting for a message to arrive.
+</p>
+
+<h3>Syslogd Producer</h3>
+
+<p>
+We currently have a custom producer and also a log4j appender to work for "logging"-type
applications. Outside the java world, however, the standard for logging is syslogd. It would
be great to have an asynchronous producer that worked with syslogd to support these kinds
of applications.
+</p>
+
+<h3>Hierarchical Topics</h3>
+
+<p>
+Currently streams are divided into only two levels&mdash;topics and partitions. This
is unnecessarily limited. We should add support for hierarchical topics and allow subscribing
to an arbitrary subset of paths. For example one could have /events/clicks and /events/logins
and one could subscribe to either of these alone or get the merged stream by subscribing to
the parent directory /events.
+</p>
+
+<p>
+In this model, partitions are naturally just subtopics (for example /events/clicks/0 might
be one partition). This reduces the conceptual weight of the system and adds some power.
+</p>
+
+<h3>Pluggable Offset Consumer Offset Storage Strategies</h3>
+
+<p>
+Currently consumer offsets are persisted in Zookeeper which works well for many use cases.
There is no inherent reason the offsets need to be stored here, however. We should expose
a pluggable interface to allow alternate storage mechanisms.
+</p>
+
+<h1>Recently Completed Projects</h1>
+
+The following are some recently completed projects from this list.
+
+<h3>Hadoop Consumer</h3>
+<p>
+Provide an InputFormat for Hadoop to allow running Map/Reduce jobs on top of Hadoop data.
+</p>
+
+<h3>Hadoop Producer</h3>
+<p>
+Provide an OutputFormat for Hadoop to allow Map/Reduce jobs to publish data to Kafka.
+</p>
+
+<h3>Console Consumer</h3>
+<p>
+The interaction with zookeeper and complexity of the elastic load balancing of consumers
makes implementing the equivalent of the rich consumer interface outside of the JVM somewhat
difficult (implementing the low-level fetch api is quite easy). A simple approach to this
problem could work similar to Hadoop Streaming and simply provide a consumer which dumps to
standard output in some user-controllable format. This can be piped to another program in
any language which simply reads from standard input to receive the data from the stream.
+</p>
+
+<h3>Rich Producer Interface</h3>
+<p>
+The current producer connects to a single broker and publishes all data there. This feature
would add a higher-level api would allow a cluster aware producer which would semantically
map messages to kafka nodes and partitions. This allows partitioning the stream of messages
with some semantic partition function based on some key in the message to spread them over
broker machines&mdash;e.g. to ensure that all messages for a particular user go to a particular
partition and hence appear in the same stream for the same consumer thread.
+</p>
+
+<h1>Project ideas for Scalathon</h1>
+
+The following are some smaller features that you can hack on and play with Kafka -
+
+<h3>Restful producer API</h3>
+We need to make the Kafka server support RESTful producer requests. This allows Kafka to
be used in any programming language without implementing the wire protocol in each language.
It also makes it easier for web applications to produce data to Kafka. Please refer to the
<a href="http://linkedin.jira.com/browse/KAFKA-71">JIRA</a> to contribute. 
+
+<h3>Pluggable decoder for the consumer</h3>
+Since 0.6, the <a href="http://sna-projects.com/kafka/javadoc/current/">producer</a>
allows a user to plug in an Encoder that converts data of type T to a Kafka message. We need
to do the same thing on the consumer side, by allowing the user to plug in a Decoder that
converts a message into an object of type T. Please refer to the <a href="http://linkedin.jira.com/browse/KAFKA-70">JIRA</a>
to contribute.
+
+<h3>Producer ACK</h3>
+Currently, the <a href="http://sna-projects.com/kafka/javadoc/current/">producer</a>
does not wait for an acknowledgement (ACK) from the Kafka server. The producer just sends
the data across and the server appends it to the appropriate log for a topic, but doesn't
send an ACK back to the producer. Ideally, after handling the producer's request and writing
the data to the log, the server should send back and ACK to the producer. And the producer
should proceed sending the next request only after it receives the ACK from the server. Please
refer to the <a href="http://linkedin.jira.com/browse/KAFKA-16">JIRA</a> to contribute.
+
+<h3>Size based retention policy</h3>
+The kafka server garbage collects logs according to a time-based retention policy (log.retention.hours).
Ideally, the server should also support a size based retention policy (log.retention.size)
to prevent any one topic from occupying too much disk space. Please refer to the <a href="http://linkedin.jira.com/browse/KAFKA-3">JIRA</a>
to contribute.
+
+<!--#include virtual="includes/footer.html" -->
\ No newline at end of file

Added: incubator/kafka/site/quickstart.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/quickstart.html?rev=1153025&view=auto
==============================================================================
--- incubator/kafka/site/quickstart.html (added)
+++ incubator/kafka/site/quickstart.html Tue Aug  2 04:31:40 2011
@@ -0,0 +1,307 @@
+<!--#include virtual="includes/header.html" -->
+
+<h2>Quick Start</h3>
+	
+<h3> Step 1: Download the code </h3>
+
+<a href="downloads" title="Kafka downloads">Download</a> a recent stable release.
+
+<pre>
+<b>&gt; tar xzf kafka-&lt;VERSION&gt;.tgz</b>
+<b>&gt; cd kafka-&lt;VERSION&gt;</b>
+</pre>
+
+<h3>Step 2: Start the server</h3>
+
+Kafka brokers and consumers use this for co-ordination. 
+<p>
+First start the zookeeper server. You can use the convenience script packaged with kafka
to get a quick-and-dirty single-node zookeeper instance.
+
+<pre>
+<b>&gt; bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties 
+...
+</pre>
+
+Now start the Kafka server:
+<pre>
+<b>&gt; bin/kafka-server-start.sh config/server.properties</b>
+jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties 
+[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
+[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)
+...
+</pre>
+
+<h3>Step 3: Send some messages</h3>
+
+A toy producer script is available to send plain text messages. To use it, run the following
command:
+
+<pre>
+<b>&gt; bin/kafka-producer-shell.sh --server kafka://localhost:9092 --topic test</b>
+> hello
+sent: hello (14 bytes)
+> world
+sent: world (14 bytes)
+</pre>
+
+<h3>Step 5: Start a consumer</h3>
+
+Start a toy consumer to dump out the messages you sent to the console:
+
+<pre>
+<b>&gt; bin/kafka-consumer-shell.sh --topic test --props config/consumer.properties</b>
+Starting consumer...
+...
+consumed: hello
+consumed: world
+</pre>
+
+If you have each of the above commands running in a different terminal then you should now
be able to type messages into the producer terminal and see them appear in the consumer terminal.
+
+<h3>Step 6: Write some code</h3>
+
+Below is some very simple examples of using Kafka for sending messages, more complete examples
can be found in the Kafka source code in the examples/ directory.
+
+<h4>Producer Code</h4>
+
+<h5>1. Log4j appender </h5>
+
+Data can also be produced to a Kafka server in the form of a log4j appender. In this way,
minimal code needs to be written in order to send some data across to the Kafka server. 
+Here is an example of how to use the Kafka Log4j appender -
+
+Start by defining the Kafka appender in your log4j.properties file.
+<pre>
+<small>// define the kafka log4j appender config parameters</small>
+log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
+<small>// <b>REQUIRED</b>: set the hostname of the kafka server</small>
+log4j.appender.KAFKA.Host=localhost
+<small>// <b>REQUIRED</b>: set the port on which the Kafka server is listening
for connections</small>
+log4j.appender.KAFKA.Port=9092
+<small>// <b>REQUIRED</b>: the topic under which the logger messages are
to be posted</small>
+log4j.appender.KAFKA.Topic=test
+<small>// the serializer to be used to turn an object into a Kafka message. Defaults
to kafka.producer.DefaultStringEncoder</small>
+log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer
+<small>// do not set the above KAFKA appender as the root appender</small>
+log4j.rootLogger=INFO
+<small>// set the logger for your package to be the KAFKA appender</small>
+log4j.logger.your.test.package=INFO, KAFKA
+</pre>
+
+Data can be sent using a log4j appender as follows -
+
+<pre>
+Logger logger = Logger.getLogger([your.test.class])    
+logger.info("message from log4j appender");
+</pre>
+
+If your log4j appender fails to send messages, please verify that the correct 
+log4j properties file is being used. You can add 
+<code>-Dlog4j.debug=true</code> to your VM parameters to verify this.
+
+<h5>2. Producer API </h5>
+
+With release 0.6, we introduced a new producer API - <code>kafka.producer.Producer&lt;T&gt;</code>.
Here are examples of using the producer -
+
+<ol>
+<li>First, start a local instance of the zookeeper server
+<pre>./bin/zookeeper-server-start.sh config/zookeeper.properties</pre>
+</li>
+<li>Next, start a kafka broker
+<pre>./bin/kafka-server-start.sh config/server.properties</pre>
+</li>
+<li>Now, create the producer with all configuration defaults and use zookeeper based
broker discovery.
+<pre>
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.producer.SyncProducerConfig;
+
+...
+
+Properties props = new Properties();
+props.put(“zk.connect”, “127.0.0.1:2181”);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+ProducerConfig config = new ProducerConfig(props);
+Producer&lt;String, String&gt; producer = new Producer&lt;String, String&gt;(config);
+</pre>
+</li>
+<li>Send a single message
+<pre>
+<small>// The message is sent to a randomly selected partition registered in ZK</small>
+ProducerData&lt;String, String&gt; data = new ProducerData&lt;String, String&gt;("test-topic",
"test-message");
+producer.send(data);	
+</pre>
+</li>
+<li>Send multiple messages to multiple topics in one request
+<pre>
+List&lt;String&gt; messages = new java.util.ArrayList&lt;String&gt;();
+messages.add("test-message1");
+messages.add("test-message2");
+ProducerData&lt;String, String&gt; data1 = new ProducerData&lt;String, String&gt;("test-topic1",
messages);
+ProducerData&lt;String, String&gt; data2 = new ProducerData&lt;String, String&gt;("test-topic2",
messages);
+List&lt;ProducerData&lt;String, String&gt;&gt; dataForMultipleTopics = new
ArrayList&lt;ProducerData&lt;String, String&gt;&gt;();
+dataForMultipleTopics.add(data1);
+dataForMultipleTopics.add(data2);
+producer.send(dataForMultipleTopics);	
+</pre>
+</li>
+<li>Send a message with a partition key. Messages with the same key are sent to the
same partition
+<pre>
+ProducerData&lt;String, String&gt; data = new ProducerData&lt;String, String&gt;("test-topic",
"test-key", "test-message");
+producer.send(data);
+</pre>
+</li>
+<li>Use your custom partitioner
+<p>If you are using zookeeper based broker discovery, <code>kafka.producer.Producer&lt;T&gt;</code>
routes your data to a particular broker partition based on a <code>kafka.producer.Partitioner&lt;T&gt;</code>,
specified through the <code>partitioner.class</code> config parameter. It defaults
to <code>kafka.producer.DefaultPartitioner</code>. If you don't supply a partition
key, then it sends each request to a random broker partition.</p>
+<pre>
+class MemberIdPartitioner extends Partitioner[MemberIdLocation] {
+  def partition(data: MemberIdLocation, numPartitions: Int): Int = {
+    (data.location.hashCode % numPartitions)
+  }
+}
+<small>// create the producer config to plug in the above partitioner</small>
+Properties props = new Properties();
+props.put(“zk.connect”, “127.0.0.1:2181”);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+props.put("partitioner.class", "xyz.MemberIdPartitioner");
+ProducerConfig config = new ProducerConfig(props);
+Producer&lt;String, String&gt; producer = new Producer&lt;String, String&gt;(config);
+</pre>
+</li>
+<li>Use custom Encoder 
+<p>The producer takes in a required config parameter <code>serializer.class</code>
that specifies an <code>Encoder&lt;T&gt;</code> to convert T to a Kafka
Message. Default is the no-op kafka.serializer.DefaultEncoder.
+Here is an example of a custom Encoder -</p>
+<pre>
+class TrackingDataSerializer extends Encoder&lt;TrackingData&gt; {
+  <small>// Say you want to use your own custom Avro encoding</small>
+  CustomAvroEncoder avroEncoder = new CustomAvroEncoder();
+  def toMessage(event: TrackingData):Message = {
+	new Message(avroEncoder.getBytes(event));
+  }
+}
+</pre>
+If you want to use the above Encoder, pass it in to the "serializer.class" config parameter
+<pre>
+Properties props = new Properties();
+props.put("serializer.class", "xyz.TrackingDataSerializer");
+</pre>
+</li>
+<li>Using static list of brokers, instead of zookeeper based broker discovery
+<p>Some applications would rather not depend on zookeeper. In that case, the config
parameter <code>broker.list</code> 
+can be used to specify the list of all brokers in the Kafka cluster.- the list of all brokers
in your Kafka cluster in the following format - 
+<code>broker_id1:host1:port1, broker_id2:host2:port2...</code></p>
+<pre>
+<small>// you can stop the zookeeper instance as it is no longer required</small>
+./bin/zookeeper-server-stop.sh	
+<small>// create the producer config object </small>
+Properties props = new Properties();
+props.put(“broker.list”, “0:localhost:9092”);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+ProducerConfig config = new ProducerConfig(props);
+<small>// send a message using default partitioner </small>
+Producer&lt;String, String&gt; producer = new Producer&lt;String, String&gt;(config);

+List&lt;String&gt; messages = new java.util.ArrayList&lt;String&gt;();
+messages.add("test-message");
+ProducerData&lt;String, String&gt; data = new ProducerData&lt;String, String&gt;("test-topic",
messages);
+producer.send(data);	
+</pre>
+</li>
+<li>Use the asynchronous producer. This buffers writes in memory until either <code>batch.size</code>
or <code>queue.time</code> is reached. After that, data is sent to the Kafka brokers
+<pre>
+Properties props = new Properties();
+props.put("zk.connect"‚ "127.0.0.1:2181");
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+props.put("producer.type", "async");
+ProducerConfig config = new ProducerConfig(props);
+Producer&lt;String, String&gt; producer = new Producer&lt;String, String&gt;(config);
+ProducerData&lt;String, String&gt; data = new ProducerData&lt;String, String&gt;("test-topic",
"test-message");
+producer.send(data);
+</pre
+</li>
+<li>Finally, the producer should be closed, through
+<pre>producer.close();</pre>
+</li>
+</ol>
+
+<h4>Consumer Code</h4>
+
+The consumer code is slightly more complex as it enables multithreaded consumption:
+
+<pre>
+// specify some consumer properties
+Properties props = new Properties();
+props.put("zk.connect", "localhost:2181");
+props.put("zk.connectiontimeout.ms", "1000000");
+props.put("groupid", "test_group");
+
+// Create the connection to the cluster
+ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+
+// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
+Map&lt;String, List&lt;KafkaMessageStream&gt;&gt; topicMessageStreams = 
+    consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
+List&lt;KafkaMessageStream&gt; streams = topicMessageStreams.get("test");
+
+// create list of 4 threads to consume from each of the partitions 
+ExecutorService executor = Executors.newFixedThreadPool(4);
+
+// consume the messages in the threads
+for(final KafkaMessageStream stream: streams) {
+  executor.submit(new Runnable() {
+    public void run() {
+      for(Message message: stream) {
+        // process message
+      }	
+    }
+  });
+}
+</pre>
+
+<h4>Hadoop Consumer</h4>
+
+<p>
+Providing a horizontally scalable solution for aggregating and loading data into Hadoop was
one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which
spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely
fast pull-based Hadoop data load capabilities (we were able to fully saturate the network
with only a handful of Kafka servers).
+</p>
+
+<p>
+Usage information on the hadoop consumer can be found <a href="https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer">here</a>.
+</p>
+
+<h4>Simple Consumer</h4>
+
+Kafka has a lower-level consumer api for reading message chunks directly from servers. Under
most circumstances this should not be needed. But just in case, it's usage is as follows:
+
+<pre>
+import kafka.api.FetchRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageSet;
+import kafka.utils.Utils;
+
+...
+
+<small>// create a consumer to connect to the kafka server running on localhost, port
9092, socket timeout of 10 secs, socket receive buffer of ~1MB</small>
+SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
+
+long offset = 0;
+while (true) {
+  <small>// create a fetch request for topic “test”, partition 0, current
offset, and fetch size of 1MB</small>
+  FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
+
+  <small>// get the message set from the consumer and print them out</small>
+  ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
+  for(Message message : messages) {
+    System.out.println("consumed: " + Utils.toString(message.payload(), "UTF-8"));
+    <small>// advance the offset after consuming each message</small>
+    offset += MessageSet.entrySize(message);
+  }
+}
+</pre>
+
+<!--#include virtual="includes/footer.html" -->
+

Added: incubator/kafka/site/styles.css
URL: http://svn.apache.org/viewvc/incubator/kafka/site/styles.css?rev=1153025&view=auto
==============================================================================
--- incubator/kafka/site/styles.css (added)
+++ incubator/kafka/site/styles.css Tue Aug  2 04:31:40 2011
@@ -0,0 +1,81 @@
+html, body{
+	font-family:Arial,sans-serif;
+    margin: 0px;
+	padding: 0px;
+	background-color: #fff;
+	color: #222;
+	line-height: 150%;
+	font-size: 11pt;
+}
+code, pre {
+	font: 1em/normal "courier new", courier, monospace;
+}
+h1, h2, h3, h4 {
+  color: #2e4a8e;
+}
+a {
+	color: #2e4a8e;
+	text-decoration: none;
+}
+#header {
+	xmargin: 3em -2em 1em 0;
+	padding: 1.2em 0 1.2em 3em;
+	border-width: 0px;
+	background-color: #2e4a8e;
+	xwidth: 80%;
+	min-width: 900px;
+}
+.title {
+	color: white;
+	font-size: 24pt;
+	margin: 2px;
+}
+.subtitle, .projects, .projects a {
+	color: white;
+	font-size: 14pt;
+	font-style: italic;
+	margin: 2px;
+	line-height: 1.5;
+}
+.projects, .projects a {
+	font-style: normal;
+	font-size: 11pt;
+}
+.lsidebar {
+	float: left;
+	font-size: 12pt;
+	color: #2e4a8e;
+	width: 150px;
+}
+.lsidebar li {
+	list-style-type: none;
+}
+.lsidebar li a {
+	text-decoration: none;
+	color: #2e4a8e;
+}
+.content {
+	width: 700px;
+	margin-left: 200px;
+	xpadding: 10px;
+	min-height: 800px;
+}
+.numeric {
+  text-align: right;
+}
+.data-table {
+  border: 1px solid #a9a9a9;
+  border-collapse: collapse;
+}
+.data-table td, .data-table th {
+  border: 1px solid #888;
+  padding: 2px;
+}
+.data-table th {
+  background-color: #ccc;
+  font-weight: bold;
+}
+.advert-message {
+  border: 3px solid #2e4a8e;
+  padding: 15px;
+}
\ No newline at end of file



Mime
View raw message