kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer
Date Tue, 09 Feb 2016 00:57:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 656b5f6a8 -> 552f65aa3


KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer

Added an example clarifying the correct way to use explicit offsets with commitSync().

Author: Adam Kunicki <adam@streamsets.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #850 from kunickiaj/KAFKA-3191

(cherry picked from commit 0eaede4dc95846e2b8f7452f41c58c0122e7a563)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/552f65aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/552f65aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/552f65aa

Branch: refs/heads/0.9.0
Commit: 552f65aa3911b80b1121463ce292679711fae66a
Parents: 656b5f6
Author: Adam Kunicki <adam@streamsets.com>
Authored: Mon Feb 8 16:56:59 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Feb 8 16:57:15 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 31 ++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/552f65aa/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 10fd8b9..c251c15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -217,6 +217,31 @@ import java.util.regex.Pattern;
  *     }
  * </pre>
  *
+ * The above example uses {@link #commitSync() commitSync} to mark all received messages
as committed. In some cases
+ * you may wish to have even finer control over which messages have been committed by specifying
an offset explicitly.
+ * In the example below we commit offset after we finish handling the messages in each partition.
+ * <p>
+ * <pre>
+ *     try {
+ *         while(running) {
+ *             ConsumerRecords&lt;String, String&gt; records = consumer.poll(Long.MAX_VALUE);
+ *             for (TopicPartition partition : records.partitions()) {
+ *                 List&lt;ConsumerRecord&lt;String, String&gt;&gt; partitionRecords
= records.records(partition);
+ *                 for (ConsumerRecord&lt;String, String&gt; record : partitionRecords)
{
+ *                     System.out.println(record.offset() + &quot;: &quot; + record.value());
+ *                 }
+ *                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ *                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset
+ 1)));
+ *             }
+ *         }
+ *     } finally {
+ *       consumer.close();
+ *     }
+ * </pre>
+ *
+ * <b>Note: The committed offset should always be the offset of the next message that
your application will read.</b>
+ * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to
the offset of the last message processed.
+ *
  * <h4>Subscribing To Specific Partitions</h4>
  *
  * In the previous examples we subscribed to the topics we were interested in and let Kafka
give our particular process
@@ -919,7 +944,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on
the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other
than Kafka, this API
-     * should not be used.
+     * should not be used. The committed offset should be the next message your application
will consume,
+     * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is a synchronous commits and will block until either the commit succeeds or an
unrecoverable error is
      * encountered (in which case it is thrown to the caller).
@@ -981,7 +1007,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on
the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other
than Kafka, this API
-     * should not be used.
+     * should not be used. The committed offset should be the next message your application
will consume,
+     * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is an asynchronous call and will not block. Any errors encountered are either
passed to the callback
      * (if provided) or discarded.


Mime
View raw message