kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject svn commit: r1354776 - in /incubator/kafka/site: design.html downloads.html includes/header.html quickstart.html
Date Wed, 27 Jun 2012 23:48:41 GMT
Author: joestein
Date: Wed Jun 27 23:48:40 2012
New Revision: 1354776

URL: http://svn.apache.org/viewvc?rev=1354776&view=rev
Log:
KAFKA-375 update the site page for the 0.7.1 release download and related changes

Modified:
    incubator/kafka/site/design.html
    incubator/kafka/site/downloads.html
    incubator/kafka/site/includes/header.html
    incubator/kafka/site/quickstart.html

Modified: incubator/kafka/site/design.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/design.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/design.html (original)
+++ incubator/kafka/site/design.html Wed Jun 27 23:48:40 2012
@@ -330,7 +330,7 @@ The partition API uses the key and the n
 We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a
single broker and has a close correspondence to the network requests sent to the server. This
API is completely stateless, with the offset being passed in on every request, allowing the
user to maintain this metadata however they choose.
 </p>
 <p>
-The high-level API hides the details of brokers from the consumer and allows consuming off
the cluster of machines without concern for the underlying topology. It also maintains the
state of what has been consumed.
+The high-level API hides the details of brokers from the consumer and allows consuming off
the cluster of machines without concern for the underlying topology. It also maintains the
state of what has been consumed. The high-level API also provides the ability to subscribe
to topics that match a filter expression (i.e., either a whitelist or a blacklist regular
expression).
 </p>
 
 <h4>Low-level API</h4>
@@ -365,12 +365,21 @@ ConsumerConnector connector = Consumer.c
 interface ConsumerConnector {
 	
   /**
-   * This method is used to get a list of KafkaMessageStreams, which are iterators over topic.
+   * This method is used to get a list of KafkaStreams, which are iterators over
+   * MessageAndMetadata objects from which you can obtain messages and their
+   * associated metadata (currently only topic).
    *  Input: a map of &lt;topic, #streams&gt;
    *  Output: a map of &lt;topic, list of message streams&gt;
-   *          Each message stream supports a message iterator.
    */
-  public Map&lt;String,List&lt;KafkaMessageStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt;
topicCountMap); 
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt;
topicCountMap); 
+
+  /**
+   * You can also obtain a list of KafkaStreams, that iterate over messages
+   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
+   * whitelist or a blacklist which is a standard Java regex.)
+   */
+  public List&lt;KafkaStream&gt; createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
 
   /* Commit the offsets of all messages consumed so far. */
   public commitOffsets()
@@ -380,10 +389,10 @@ interface ConsumerConnector {
 }
 </pre>
 <p>
-This API is centered around iterators, implemented by the KafkaMessageStream class. Each
KafkaMessageStream represents the stream of messages from one or more partitions on one or
more servers. Each stream is used for single threaded processing, so the client can provide
the number of desired streams in the create call. Thus a stream may represent the merging
of multiple server partitions (to correspond to the number of processing threads), but each
partition only goes to one stream.
+This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream
represents the stream of messages from one or more partitions on one or more servers. Each
stream is used for single threaded processing, so the client can provide the number of desired
streams in the create call. Thus a stream may represent the merging of multiple server partitions
(to correspond to the number of processing threads), but each partition only goes to one stream.
 </p>
 <p>
-The create call registers the consumer for the topic, which results in rebalancing the consumer/broker
assignment. To minimize this rebalancing the API encourages creating many topic streams in
a single call.	
+The createMessageStreams call registers the consumer for the topic, which results in rebalancing
the consumer/broker assignment. The API encourages creating many topic streams in a single
call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally)
registers watchers to discover new topics that match its filter. Note that each stream that
createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e.,
if multiple topics are allowed by the filter).
 </p>
 <h2>Network Layer</h2>
 <p>
@@ -392,6 +401,33 @@ The network layer is a fairly straight-f
 <h2>Messages</h2>
 <p>
 Messages consist of a fixed-size header and variable length opaque byte array payload. The
header contains a format version and a CRC32 checksum to detect corruption or truncation.
Leaving the payload opaque is the right decision: there is a great deal of progress being
made on serialization libraries right now, and any particular choice is unlikely to be right
for all uses. Needless to say a particular application using Kafka would likely mandate a
particular serialization type as part of its usage. The <code>MessageSet</code>
interface is simply an iterator over messages with specialized methods for bulk reading and
writing to an NIO <code>Channel</code>.
+
+<h2>Message Format</h2>
+
+<pre>
+	/** 
+	 * A message. The format of an N byte message is the following: 
+	 * 
+	 * If magic byte is 0 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 4 byte CRC32 of the payload 
+	 * 
+	 * 3. N - 5 byte payload 
+	 * 
+	 * If magic byte is 1 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of
the version (e.g. compression enabled, type of codec used) 
+	 * 
+	 * 3. 4 byte CRC32 of the payload 
+	 * 
+	 * 4. N - 6 byte payload 
+	 * 
+	 */
+</pre>
 </p>
 <h2>Log</h2>
 <p>
@@ -545,9 +581,10 @@ When a consumer starts, it does the foll
 <ol>
    <li> Register itself in the consumer id registry under its group.
    </li>
-   <li> Register a watch on changes (new consumers joining or any existing consumers
leaving) under the consumer id registry. Each change triggers rebalancing among all consumers
within the group to which the changed consumer belongs.
+   <li> Register a watch on changes (new consumers joining or any existing consumers
leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers
within the group to which the changed consumer belongs.)
    </li>
-   <li> Register a watch on changes (new brokers joining or any existing brokers leaving)
under the broker id registry.  Each change triggers rebalancing among all consumers in all
consumer groups. </li>
+   <li> Register a watch on changes (new brokers joining or any existing brokers leaving)
under the broker id registry. (Each change triggers rebalancing among all consumers in all
consumer groups.) </li>
+   <li> If the consumer creates a message stream using a topic filter, it also registers
a watch on changes (new topics being added) under the broker topic registry. (Each change
will trigger re-evaluation of the available topics to determine which topics are allowed by
the topic filter. A new allowed topic will trigger rebalancing among all consumers within
the consumer group.)</li>
    <li> Force itself to rebalance within in its consumer group.
    </li>
 </ol>

Modified: incubator/kafka/site/downloads.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/downloads.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/downloads.html (original)
+++ incubator/kafka/site/downloads.html Wed Jun 27 23:48:40 2012
@@ -2,20 +2,29 @@
 
 <h2>Downloads</h2>
 
-The current stable version is 0.7.0-incubating. You can verify your download by following
these <a href="http://www.apache.org/info/verification.html">procedures</a> and
using these <a href="http://svn.apache.org/repos/asf/incubator/kafka/KEYS">KEYS</a>.
+The current stable version is 0.7.1-incubating. You can verify your download by following
these <a href="http://www.apache.org/info/verification.html">procedures</a> and
using these <a href="http://svn.apache.org/repos/asf/incubator/kafka/KEYS">KEYS</a>.
 
-<h3>0.7.0 Release</h3>
+<h3>0.7.1 Release</h3>
 <ul>
 	<li>
-		<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/RELEASE-NOTES.html">Release
Notes</a>
+		<a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/RELEASE-NOTES.html">Release
Notes</a>
 	</li>
  	<li>
-		Download: <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz">kafka-0.7.0-incubating-src.tar.gz</a>
(<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.asc">asc</a>,
<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.md5">md5</a>)
+		Download: <a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/kafka-0.7.1-incubating-src.tgz">kafka-0.7.1-incubating-src.tgz</a>
(<a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/kafka-0.7.1-incubating-src.tgz.asc">asc</a>)
 	</li>
 </ul>
 <h3>Older Releases</h3>
+	<h4>0.7.0 Release</h4>
+	<ul>
+		<li>
+			<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/RELEASE-NOTES.html">Release
Notes</a>
+		</li>
+	 	<li>
+			Download: <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz">kafka-0.7.0-incubating-src.tar.gz</a>
(<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.asc">asc</a>,
<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.md5">md5</a>)
+		</li>
+	</ul>
 <p>
-You can download the previous releases <a href="http://sna-projects.com/kafka/downloads.php">here</a>.
+You can download releases previous to 0.7.0-incubating <a href="http://sna-projects.com/kafka/downloads.php">here</a>.
 </p>
 
 <h3>Disclaimer</h3>

Modified: incubator/kafka/site/includes/header.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/includes/header.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/includes/header.html (original)
+++ incubator/kafka/site/includes/header.html Wed Jun 27 23:48:40 2012
@@ -26,7 +26,7 @@
 			<div class="lsidebar">
 				<ul>
 					<li><a href="downloads.html">download</a></li>
-					<li><a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs">api&nbsp;docs</a></li>
+					<li><a href="http://people.apache.org/~joestein/kafka-0.7.1-incubating-docs">api&nbsp;docs</a></li>
 					<li><a href="quickstart.html">quickstart</a></li>
 					<li><a href="design.html">design</a></li>
 					<li><a href="configuration.html">configuration</a></li>

Modified: incubator/kafka/site/quickstart.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/quickstart.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/quickstart.html (original)
+++ incubator/kafka/site/quickstart.html Wed Jun 27 23:48:40 2012
@@ -245,19 +245,19 @@ ConsumerConfig consumerConfig = new Cons
 ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
 // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
-Map&lt;String, List&lt;KafkaMessageStream&lt;Message&gt;&gt;&gt;
topicMessageStreams = 
+Map&lt;String, List&lt;KafkaStream&lt;Message&gt;&gt;&gt; topicMessageStreams
= 
     consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
-List&lt;KafkaMessageStream&lt;Message&gt;&gt; streams = topicMessageStreams.get("test");
+List&lt;KafkaStream&lt;Message&gt;&gt; streams = topicMessageStreams.get("test");
 
 // create list of 4 threads to consume from each of the partitions 
 ExecutorService executor = Executors.newFixedThreadPool(4);
 
 // consume the messages in the threads
-for(final KafkaMessageStream&lt;Message&gt; stream: streams) {
+for(final KafkaStream&lt;Message&gt; stream: streams) {
   executor.submit(new Runnable() {
     public void run() {
-      for(Message message: stream) {
-        // process message
+      for(MessageAndMetadata msgAndMetadata: stream) {
+        // process message (msgAndMetadata.message())
       }	
     }
   });



Mime
View raw message