kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1656620 - in /kafka/site/082: configuration.html implementation.html
Date Tue, 03 Feb 2015 01:59:07 GMT
Author: jjkoshy
Date: Tue Feb  3 01:59:07 2015
New Revision: 1656620

URL: http://svn.apache.org/r1656620
KAFKA-1729; Add documentation for Kafka-based offset management; reviewed by Jun Rao and Gwen


Modified: kafka/site/082/configuration.html
URL: http://svn.apache.org/viewvc/kafka/site/082/configuration.html?rev=1656620&r1=1656619&r2=1656620&view=diff
--- kafka/site/082/configuration.html (original)
+++ kafka/site/082/configuration.html Tue Feb  3 01:59:07 2015
@@ -384,6 +384,53 @@ ZooKeeper also allows you to add a "chro
       <td>Enable delete topic.</td>
+    <tr>
+      <td>offsets.topic.num.partitions</td>
+      <td>50</td>
+      <td>The number of partitions for the offset commit topic. Since changing this
after deployment is currently unsupported, we recommend using a higher setting for production
(e.g., 100-200).</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.retention.minutes</td>
+      <td>1440</td>
+      <td>Offsets that are older than this age will be marked for deletion. The actual
purge will occur when the log cleaner compacts the offsets topic.</td>
+    </tr>
+    <tr>
+      <td>offsets.retention.check.interval.ms</td>
+      <td>600000</td>
+      <td>The frequency at which the offset manager checks for stale offsets.</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.replication.factor</td>
+      <td>3</td>
+      <td>The replication factor for the offset commit topic. A higher setting (e.g.,
three or four) is recommended in order to ensure higher availability. If the offsets topic
is created when fewer brokers than the replication factor then the offsets topic will be created
with fewer replicas.</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.segment.bytes</td>
+      <td>104857600</td>
+      <td>Segment size for the offsets topic. Since it uses a compacted topic, this
should be kept relatively low in order to facilitate faster log compaction and loads.</td>
+    </tr>
+    <tr>
+      <td>offsets.load.buffer.size</td>
+      <td>5242880</td>
+      <td>An offset load occurs when a broker becomes the offset manager for a set
of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting
corresponds to the batch size (in bytes) to use when reading from the offsets segments when
loading offsets into the offset manager's cache.</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.compression.codec</td>
+      <td>none</td>
+      <td>(Should not be used until KAFKA-1374 is implemented.) Compression codec for
the offsets topic. Compression should be enabled in order to achieve "atomic" commits.</td>
+    </tr>
+    <tr>
+      <td>offsets.commit.required.acks</td>
+      <td>-1</td>
+      <td>The number of acknowledgements that are required before the offset commit
can be accepted. This is similar to the producer's acknowledgement setting. In general, the
default should not be overridden.</td>
+    </tr>
+    <tr>
+      <td>offsets.commit.timeout.ms</td>
+      <td>5000</td>
+      <td>The offset commit will be delayed until this timeout or the required number
of replicas have received the offset commit. This is similar to the producer request timeout.</td>
+    </tr>
 <p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
@@ -637,6 +684,36 @@ The essential consumer configurations ar
       <td colspan="1">2000</td>
       <td>How far a ZK follower can be behind a ZK leader</td>
+    <tr>
+      <td>offsets.storage</td>
+      <td colspan="1">zookeeper</td>
+      <td>Select where offsets should be stored (zookeeper or kafka).</td>
+    </tr>
+    <tr>
+      <td>offsets.channel.backoff.ms</td>
+      <td colspan="1">1000</td>
+      <td>The backoff period when reconnecting the offsets channel or retrying failed
offset fetch/commit requests.</td>
+    </tr>
+    <tr>
+      <td>offsets.channel.socket.timeout.ms</td>
+      <td colspan="1">10000</td>
+      <td>Socket timeout when reading responses for offset fetch/commit requests. This
timeout is also used for ConsumerMetadata requests that are used to query for the offset manager.</td>
+    </tr>
+    <tr>
+      <td>offsets.commit.max.retries</td>
+      <td colspan="1">5</td>
+      <td>Retry the offset commit up to this many times on failure. This retry count
only applies to offset commits during shut-down. It does not apply to commits originating
from the auto-commit thread. It also does not apply to attempts to query for the offset coordinator
before committing offsets. i.e., if a consumer metadata request fails for any reason, it will
be retried and that retry does not count toward this limit.</td>
+    </tr>
+    <tr>
+      <td>dual.commit.enabled</td>
+      <td colspan="1">true</td>
+      <td>If you are using "kafka" as offsets.storage, you can dual commit offsets
to ZooKeeper (in addition to Kafka). This is required during migration from zookeeper-based
offset storage to kafka-based offset storage. With respect to any given consumer group, it
is safe to turn this off after all instances within that group have been migrated to the new
version that commits offsets to the broker (instead of directly to ZooKeeper).</td>
+    </tr>
+    <tr>
+      <td>partition.assignment.strategy</td>
+      <td colspan="1">range</td>
+      <td><p>Select between the "range" or "roundrobin" strategy for assigning
partitions to consumer streams.<p>The round-robin partition assignor lays out all the
available partitions and all the available consumer threads. It then proceeds to do a round-robin
assignment from partition to consumer thread. If the subscriptions of all consumer instances
are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership
counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment
is permitted only if: (a) Every topic has the same number of streams within a consumer instance
(b) The set of subscribed topics is identical for every consumer instance within the group.<p>
Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions
in numeric order and the consumer threads in lexicographic order. We then divide the number
of partitions by the total number of consumer streams (threads) 
 to determine the number of partitions to assign to each consumer. If it does not evenly divide,
then the first few consumers will have one extra partition.</td>
+    </tr>

Modified: kafka/site/082/implementation.html
URL: http://svn.apache.org/viewvc/kafka/site/082/implementation.html?rev=1656620&r1=1656619&r2=1656620&view=diff
--- kafka/site/082/implementation.html (original)
+++ kafka/site/082/implementation.html Tue Feb  3 01:59:07 2015
@@ -225,6 +225,35 @@ Note that two kinds of corruption must b
 <h3><a id="distributionimpl">5.6 Distribution</a></h3>
+<h4>Consumer Offset Tracking</h4>
+The high-level consumer tracks the maximum offset it has consumed in each partition and periodically
commits its offset vector so that it can resume from those offsets in the event of a restart.
Kafka provides the option to store all the offsets for a given consumer group in a designated
broker (for that group) called the <i>offset manager</i>. i.e., any consumer instance
in that consumer group should send its offset commits and fetches to that offset manager (broker).
The high-level consumer handles this automatically. If you use the simple consumer you will
need to discover the offset manager and explicitly commit or fetch offsets. A consumer can
look up its offset manager by issuing a ConsumerMetadataRequest to any Kafka broker and reading
the ConsumerMetadataResponse which will contain the offset manager. The consumer can then
proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager
moves, the consumer will need to rediscover the offset mana
 ger. If you wish to manage your offsets manually, you can take a look at these <a href="https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka">code
samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest</a>.
+When the offset manager receives an OffsetCommitRequest, it appends the request to a special
<a href="#compaction">compacted</a> Kafka topic named <i>__consumer_offsets</i>.
The offset manager sends a successful offset commit response to the consumer only after all
the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate
within a configurable timeout, the offset commit will fail and the consumer may retry the
commit after backing off. (This is done automatically by the high-level consumer.) The brokers
periodically compact the offsets topic since it only needs to maintain the most recent offset
commit per partition. The offset manager also caches the offsets in an in-memory table in
order to serve offset fetches quickly.
+When the offset manager receives an offset fetch request, it simply returns the last committed
offset vector from the offsets cache. In case the offset manager was just started or if it
just became the offset manager for a new set of consumer groups (by becoming a leader for
a partition of the offsets topic), it may need to load the offsets topic partition into the
cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and
the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically
by the high-level consumer.)
+<h5><a id="offsetmigration">Migrating offsets from ZooKeeper to Kafka</a></h5>
+Kafka consumers in earlier releases store their offsets by default in ZooKeeper. It is possible
to migrate these consumers to commit offsets into Kafka by following these steps:
+   <li>Set <code>offsets.storage=kafka</code> and <code>dual.commit.enabled=true</code>
in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are
+   </li>
+   <li>Set <code>dual.commit.enabled=false</code> in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are
+   </li>
+A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be performed using the
above steps if you set <code>offsets.storage=zookeeper</code>.
 <h4>ZooKeeper Directories</h4>
 The following gives the ZooKeeper structures and algorithms used for co-ordination between
consumers and brokers.
@@ -256,7 +285,7 @@ Each broker registers itself under the t
 <h4>Consumers and Consumer Groups</h4>
-Consumers of topics also register themselves in ZooKeeper, in order to balance the consumption
of data and track their offsets in each partition for each broker they consume from.
+Consumers of topics also register themselves in ZooKeeper, in order to coordinate with each
other and balance the consumption of data. Consumers can also store their offsets in ZooKeeper
by setting <code>offsets.storage=zookeeper</code>. However, this offset storage
mechanism will be deprecated in a future release. Therefore, it is recommended to <a href="#offsetmigration">migrate
offsets storage to Kafka</a>.
@@ -277,9 +306,9 @@ In addition to the group_id which is sha
 Each of the consumers in the group registers under its group and creates a znode with its
consumer_id. The value of the znode contains a map of &lt;topic, #streams&gt;. This
id is simply used to identify each of the consumers which is currently active within a group.
This is an ephemeral node so it will disappear if the consumer process dies.
-<h4>Consumer Offset Tracking</h4>
+<h4>Consumer Offsets</h4>
-Consumers track the maximum offset they have consumed in each partition. This value is stored
in a ZooKeeper directory
+Consumers track the maximum offset they have consumed in each partition. This value is stored
in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>. This valued
is stored in a ZooKeeper directory.
 /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
((persistent node)

View raw message