kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1328 follow up: Updated javadoc
Date Wed, 21 May 2014 05:11:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c24740c7b -> bf83131df


KAFKA-1328 follow up: Updated javadoc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf83131d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf83131d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf83131d

Branch: refs/heads/trunk
Commit: bf83131dffbdb6c39de0135e0426701ca141f150
Parents: c24740c
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue May 20 22:11:19 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue May 20 22:11:19 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 48 +++++++++++---------
 .../clients/consumer/ConsumerExampleTest.java   |  5 +-
 2 files changed, 29 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf83131d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 18bcc90..fe93afa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -57,7 +57,11 @@ import org.slf4j.LoggerFactory;
  *          for(int i = 0;i < recordsPerTopic.size();i++) {
  *               ConsumerRecord record = recordsPerTopic.get(i);
  *               // process record
- *               processedOffsets.put(record.partition(), record.offset());             
  
+ *               try {
+ *               	processedOffsets.put(record.topicAndpartition(), record.offset());
+ *               } catch (Exception e) {
+ *               	e.printStackTrace();
+ *               }               
  *          }
  *     }
  *     return processedOffsets; 
@@ -80,7 +84,7 @@ import org.slf4j.LoggerFactory;
  * consumer.subscribe("foo", "bar");
  * boolean isRunning = true;
  * while(isRunning) {
- *   Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *   Map<String, ConsumerRecords> records = consumer.poll(100);
  *   process(records);
  * }
  * consumer.close();
@@ -88,7 +92,7 @@ import org.slf4j.LoggerFactory;
  * </pre>
  * This example demonstrates how the consumer can be used to leverage Kafka's group management
functionality for automatic consumer load 
  * balancing and failover. This example assumes that the offsets are stored in Kafka and
are manually committed using 
- * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing
of the consumed
+ * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets
if processing of the consumed
  * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)}
is only useful for rewinding the offsets
  * of the current consumer instance. As such, this will not trigger a rebalance or affect
the fetch offsets for the other consumer instances.
  * <pre>
@@ -105,14 +109,14 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<String, ConsumerRecords> records = consumer.poll(100);
  *     try {
  *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
  *         numRecords += records.size();
  *         // commit offsets for all partitions of topics foo, bar synchronously, owned by
this consumer instance
  *         if(numRecords % commitInterval == 0) 
- *           consumer.commit();
+ *           consumer.commit(false);
  *     } catch(Exception e) {
  *         try {
  *             // rewind consumer's offsets for failed partitions
@@ -155,14 +159,14 @@ import org.slf4j.LoggerFactory;
  * KafkaConsumer consumer = new KafkaConsumer(props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should
be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer
consumer, TopicPartition...partitions) {
+ *                                                public void onPartitionsAssigned(Consumer
consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> latestCommittedOffsets
= consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map<TopicPartition, Long>
newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer
consumer, TopicPartition...partitions) {
- *                                                    consumer.commit();
+ *                                                public void onPartitionsRevoked(Consumer
consumer, Collection<TopicPartition> partitions) {
+ *                                                    consumer.commit(true);
  *                                                }
  *                                                // this API rewinds every partition back
by numberOfMessagesToRewindBackTo messages 
  *                                                private Map<TopicPartition, Long>
rewindOffsets(Map<TopicPartition, Long> currentOffsets,
@@ -179,14 +183,15 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<String, ConsumerRecords> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
  *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this
consumer instance
  *     if(numRecords % commitInterval == 0) 
- *         consumer.commit(consumedOffsets);
+ *         consumer.commit(consumedOffsets, true);
  * }
+ * consumer.commit(true);
  * consumer.close();
  * }
  * </pre>
@@ -208,19 +213,19 @@ import org.slf4j.LoggerFactory;
  * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to
Kafka based offset storage
  * KafkaConsumer consumer = new KafkaConsumer(props,
  *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer
consumer, TopicPartition...partitions) {
+ *                                                public void onPartitionsAssigned(Consumer
consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> lastCommittedOffsets
= getLastCommittedOffsetsFromCustomStore(partitions);
  *                                                    consumer.seek(lastCommittedOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer
consumer, TopicPartition...partitions) {
+ *                                                public void onPartitionsRevoked(Consumer
consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> offsets
= getLastConsumedOffsets(partitions);
  *                                                    commitOffsetsToCustomStore(offsets);

  *                                                }
  *                                                // following APIs should be implemented
by the user for custom offset management
- *                                                private Map<TopicPartition, Long>
getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
+ *                                                private Map<TopicPartition, Long>
getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
  *                                                    return null;
  *                                                }
- *                                                private Map<TopicPartition, Long>
getLastConsumedOffsets(TopicPartition... partitions) { return null; }
+ *                                                private Map<TopicPartition, Long>
getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
  *                                                private void commitOffsetsToCustomStore(Map<TopicPartition,
Long> offsets) {}
  *                                            });
  * consumer.subscribe("foo", "bar");
@@ -229,7 +234,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<String, ConsumerRecords> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -237,6 +242,7 @@ import org.slf4j.LoggerFactory;
  *     if(numRecords % commitInterval == 0) 
  *         commitOffsetsToCustomStore(consumedOffsets);
  * }
+ * consumer.commit(true);
  * consumer.close();
  * }
  * </pre>
@@ -262,15 +268,15 @@ import org.slf4j.LoggerFactory;
  * partitions[1] = partition1;
  * consumer.subscribe(partitions);
  * // find the last committed offsets for partitions 0,1 of topic foo
- * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(partition0,
partition1);
+ * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
  * // seek to the last committed offsets to avoid duplicates
  * consumer.seek(lastCommittedOffsets);        
  * // find the offsets of the latest available messages to know where to stop consumption
- * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2,
partition0, partition1);
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2,
Arrays.asList(partitions));
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<String, ConsumerRecords> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     for(TopicPartition partition : partitions) {
@@ -280,7 +286,7 @@ import org.slf4j.LoggerFactory;
  *             isRunning = true;
  *     }
  * }
- * consumer.commit();
+ * consumer.commit(true);
  * consumer.close();
  * }
  * </pre>
@@ -304,11 +310,11 @@ import org.slf4j.LoggerFactory;
  * // seek to the last committed offsets to avoid duplicates
  * consumer.seek(lastCommittedOffsets);        
  * // find the offsets of the latest available messages to know where to stop consumption
- * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2,
partition0, partition1);
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2,
Arrays.asList(partitions));
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ *     Map<String, ConsumerRecords> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     // commit offsets for partitions 0,1 for topic foo to custom store

http://git-wip-us.apache.org/repos/asf/kafka/blob/bf83131d/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
index 0548fb4..29ad25e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
@@ -53,9 +53,8 @@ public class ConsumerExampleTest {
 
     /**
      * This example demonstrates how to use the consumer to leverage Kafka's group management
functionality for automatic consumer load 
-     * balancing and failure detection. This example assumes that the offsets are stored
in Kafka and are manually committed using 
-     * either the commit() or commitAsync() APIs. This example also demonstrates rewinding
the consumer's offsets if processing of consumed
-     * messages fails.
+     * balancing and failure detection. This example assumes that the offsets are stored
in Kafka and are manually committed using the 
+     * commit() API. This example also demonstrates rewinding the consumer's offsets if processing
of consumed messages fails.
      */
 //     @Test
 //     public void testConsumerGroupManagementWithManualOffsetCommit() {


Mime
View raw message