kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [12/17] kafka-site git commit: update 0.9.0 docs
Date Fri, 13 Nov 2015 17:40:34 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/e047c4b2/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
----------------------------------------------------------------------
diff --git a/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html b/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
index b748b04..a9f9789 100644
--- a/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
+++ b/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
@@ -2,15 +2,15 @@
 <!-- NewPage -->
 <html lang="en">
 <head>
-<!-- Generated by javadoc (version 1.7.0_51) on Fri Feb 13 15:47:44 PST 2015 -->
-<title>KafkaConsumer (clients 0.8.3-SNAPSHOT API)</title>
-<meta name="date" content="2015-02-13">
+<!-- Generated by javadoc (version 1.7.0_80) on Fri Nov 13 08:33:05 PST 2015 -->
+<title>KafkaConsumer (clients 0.9.0.0 API)</title>
+<meta name="date" content="2015-11-13">
 <link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style">
 </head>
 <body>
 <script type="text/javascript"><!--
     if (location.href.indexOf('is-external=true') == -1) {
-        parent.document.title="KafkaConsumer (clients 0.8.3-SNAPSHOT API)";
+        parent.document.title="KafkaConsumer (clients 0.9.0.0 API)";
     }
 //-->
 </script>
@@ -35,7 +35,7 @@
 </div>
 <div class="subNav">
 <ul class="navList">
-<li><a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer"><span class="strong">Prev Class</span></a></li>
+<li><a href="../../../../../org/apache/kafka/clients/consumer/InvalidOffsetException.html" title="class in org.apache.kafka.clients.consumer"><span class="strong">Prev Class</span></a></li>
 <li><a href="../../../../../org/apache/kafka/clients/consumer/MockConsumer.html" title="class in org.apache.kafka.clients.consumer"><span class="strong">Next Class</span></a></li>
 </ul>
 <ul class="navList">
@@ -99,21 +99,20 @@
 </dl>
 <hr>
 <br>
-<pre>public class <span class="strong">KafkaConsumer&lt;K,V&gt;</span>
+<pre>@InterfaceStability.Unstable
+public class <span class="strong">KafkaConsumer&lt;K,V&gt;</span>
 extends java.lang.Object
 implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;K,V&gt;</pre>
 <div class="block">A Kafka client that consumes records from a Kafka cluster.
  <p>
  It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
- data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ data it fetches migrate within the cluster. This client also interacts with the server to allow groups of
  consumers to load balance consumption using consumer groups (as described below).
  <p>
- The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ The consumer maintains TCP connections to the necessary brokers to fetch data.
  Failure to close the consumer after use will leak these connections.
- <p>
- The consumer is thread safe but generally will be used only from within a single thread. The consumer client has no
- threads of it's own, all work is done in the caller's thread when calls are made on the various methods exposed.
- 
+ The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
+
  <h3>Offsets and Consumer Position</h3>
  Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
  a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
@@ -124,21 +123,26 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
  every time the consumer receives data calls <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a> and receives messages.
  <p>
- The <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commit(org.apache.kafka.clients.consumer.CommitType)"><code>committed position</code></a> is the last offset that has been saved securely. Should the
+ The <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>committed position</code></a> is the last offset that has been saved securely. Should the
  process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
- offsets periodically, or it can choose to control this committed position manually by calling
- <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commit(org.apache.kafka.clients.consumer.CommitType)"><code>commit</code></a>.
+ offsets periodically; or it can choose to control this committed position manually by calling
+ <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()"><code>commitSync</code></a>, which will block until the offsets have been successfully committed
+ or fatal error has happened during the commit process, or <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback)"><code>commitAsync</code></a> which is non-blocking
+ and will trigger <a href="../../../../../org/apache/kafka/clients/consumer/OffsetCommitCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>OffsetCommitCallback</code></a> upon either successfully committed or fatally failed.
  <p>
  This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
  detail below.
- 
- <h3>Consumer Groups</h3>
- 
+
+ <h3>Consumer Groups and Topic Subscriptions</h3>
+
  Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
  processing records. These processes can either be running on the same machine or, as is more likely, they can be
  distributed over many machines to provide additional scalability and fault tolerance for processing.
  <p>
- Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
+ list of topics it wants to subscribe to through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>,
+ or subscribe to all topics matching certain pattern through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Pattern, ConsumerRebalanceListener)</code></a>.
+ Kafka will deliver each message in the
  subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
  over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
  processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
@@ -156,43 +160,46 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
  delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
  have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
- have it's own consumer group, so each process would subscribe to all the records published to the topic.
+ have its own consumer group, so each process would subscribe to all the records published to the topic.
  <p>
- In addition, when offsets are committed they are always committed for a given consumer group.
+ In addition, when group reassignment happens automatically, consumers can be notified through <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a>,
+ which allows them to finish necessary application-level logic such as state cleanup, manual offset
+ commits (note that offsets are always committed for a given consumer group), etc.
+ See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details
  <p>
- It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
- partition balancing.
- 
+ It is also possible for the consumer to manually specify the partitions that are assigned to it through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a>,
+ which disables this dynamic partition assignment.
+
  <h3>Usage Examples</h3>
  The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
  demonstrate how to use them.
- 
- <h4>Simple Processing</h4>
- This example demonstrates the simplest usage of Kafka's consumer api.
- 
+
+ <h4>Automatic Offset Committing</h4>
+ This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing.
+ <p>
  <pre>
      Properties props = new Properties();
-     props.put(&quot;metadata.broker.list&quot;, &quot;localhost:9092&quot;);
+     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
      props.put(&quot;group.id&quot;, &quot;test&quot;);
      props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
      props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
      props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
-     props.put(&quot;key.serializer&quot;, &quot;org.apache.kafka.common.serializers.StringSerializer&quot;);
-     props.put(&quot;value.serializer&quot;, &quot;org.apache.kafka.common.serializers.StringSerializer&quot;);
+     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
      KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
-     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
      while (true) {
          ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
          for (ConsumerRecord&lt;String, String&gt; record : records)
              System.out.printf(&quot;offset = %d, key = %s, value = %s&quot;, record.offset(), record.key(), record.value());
      }
  </pre>
- 
+
  Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
  the config <code>auto.commit.interval.ms</code>.
  <p>
  The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
- configuration <code>metadata.broker.list</code>. This list is just used to discover the rest of the brokers in the
+ configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the
  cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
  case there are servers down when the client is connecting).
  <p>
@@ -200,16 +207,19 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  called <i>test</i> as described above.
  <p>
  The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
- consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as
  the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
  to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
- considered dead and it's partitions will be assigned to another process.
+ considered dead and its partitions will be assigned to another process.
  <p>
- The serializers settings specify how to turn the objects the user provides into bytes. By specifying the string
- serializers we are saying that our record's key and value will just be simple strings.
- 
- <h4>Controlling When Messages Are Considered Consumed</h4>
- 
+ The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
+ are saying that our record's key and value will just be simple strings.
+
+ <h4>Manual Offset Control</h4>
+
+ Instead of relying on the consumer to periodically commit consumed offsets, users can also control when messages
+ should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages
+ are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
  In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
  batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
  would be considered consumed after they were given out by the consumer, and it would be possible that our process
@@ -221,18 +231,18 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
  Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
  time but in failure cases could be duplicated.
- 
+ <p>
  <pre>
      Properties props = new Properties();
-     props.put(&quot;metadata.broker.list&quot;, &quot;localhost:9092&quot;);
+     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
      props.put(&quot;group.id&quot;, &quot;test&quot;);
      props.put(&quot;enable.auto.commit&quot;, &quot;false&quot;);
      props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
      props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
-     props.put(&quot;key.serializer&quot;, &quot;org.apache.kafka.common.serializers.StringSerializer&quot;);
-     props.put(&quot;value.serializer&quot;, &quot;org.apache.kafka.common.serializers.StringSerializer&quot;);
+     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
      KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
-     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
      int commitInterval = 200;
      List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String, String&gt;&gt;();
      while (true) {
@@ -241,15 +251,15 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
              buffer.add(record);
              if (buffer.size() &gt;= commitInterval) {
                  insertIntoDb(buffer);
-                 consumer.commit(CommitType.SYNC);
+                 consumer.commitSync();
                  buffer.clear();
              }
          }
      }
  </pre>
- 
+
  <h4>Subscribing To Specific Partitions</h4>
- 
+
  In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
  a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
  instances of our program can divided up the work of processing records.
@@ -269,24 +279,24 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  <p>
  This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
  partitions:
- 
+
  <pre>
      String topic = &quot;foo&quot;;
      TopicPartition partition0 = new TopicPartition(topic, 0);
      TopicPartition partition1 = new TopicPartition(topic, 1);
-     consumer.subscribe(partition0);
-     consumer.subscribe(partition1);
+     consumer.assign(partition0);
+     consumer.assign(partition1);
  </pre>
- 
+
  The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
  be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
  <p>
  It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
  balancing) using the same consumer instance.
- 
- <h4>Managing Your Own Offsets</h4>
- 
- The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
+
+ <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4>
+
+ The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own
  choosing. The primary use case for this is allowing the application to store both the offset and the results of the
  consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
  possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
@@ -305,31 +315,33 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
  from what it has ensuring that no updates are lost.
  </ul>
- 
- Each record comes with it's own offset, so to manage your own offset you just need to do the following:
- <ol>
+ <p>
+ Each record comes with its own offset, so to manage your own offset you just need to do the following:
+
+ <ul>
  <li>Configure <code>enable.auto.commit=false</code>
  <li>Use the offset provided with each <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecord.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerRecord</code></a> to save your position.
- <li>On restart restore the position of the consumer using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition, long)"><code>seek(TopicPartition, long)</code></a>.
- </ol>
- 
+ <li>On restart restore the position of the consumer using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)"><code>seek(TopicPartition, long)</code></a>.
+ </ul>
+
+ <p>
  This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
- search index use case described above). If the partition assignment is done automatically special care will also be
- needed to handle the case where partition assignments change. This can be handled using a special callback specified
- using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a>. When partitions are taken from a consumer the consumer will want to commit its
- offset for those partitions by implementing
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html#onPartitionsRevoked(org.apache.kafka.clients.consumer.Consumer, java.util.Collection)"><code>ConsumerRebalanceCallback.onPartitionsRevoked(Consumer, Collection)</code></a>. When partitions are assigned to a
+ search index use case described above). If the partition assignment is done automatically special care is
+ needed to handle the case where partition assignments change. This can be done by providing a
+ <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> instance in the call to <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>
+ and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Pattern, ConsumerRebalanceListener)</code></a>.
+ For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
+ implementing <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)"><code>ConsumerRebalanceListener.onPartitionsRevoked(Collection)</code></a>. When partitions are assigned to a
  consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
- to that position by implementing <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html#onPartitionsAssigned(org.apache.kafka.clients.consumer.Consumer, java.util.Collection)"><code>ConsumerRebalanceCallback.onPartitionsAssigned(Consumer, Collection)</code></a>.
+ to that position by implementing <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsAssigned(java.util.Collection)"><code>ConsumerRebalanceListener.onPartitionsAssigned(Collection)</code></a>.
  <p>
- Another common use for <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a> is to flush any caches the application maintains for
+ Another common use for <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> is to flush any caches the application maintains for
  partitions that are moved elsewhere.
- 
+
  <h4>Controlling The Consumer's Position</h4>
- 
- In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
- position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
+
+ In most use cases the consumer will simply consume records from beginning to end, periodically committing its
+ position (either automatically or manually). However Kafka allows the consumer to manually control its position,
  moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
  the most recent records without actually consuming the intermediate records.
  <p>
@@ -339,24 +351,89 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  attempt to catch up processing all records, but rather just skip to the most recent records.
  <p>
  Another use case is for a system that maintains local state as described in the previous section. In such a system
- the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
+ the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise
  if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
- reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
- 
- Kafka allows specifying the position using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition, long)"><code>seek(TopicPartition, long)</code></a> to specify the new position. Special
+ re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ <p>
+ Kafka allows specifying the position using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)"><code>seek(TopicPartition, long)</code></a> to specify the new position. Special
  methods for seeking to the earliest and latest offset the server maintains are also available (
  <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(org.apache.kafka.common.TopicPartition...)"><code>seekToBeginning(TopicPartition...)</code></a> and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(org.apache.kafka.common.TopicPartition...)"><code>seekToEnd(TopicPartition...)</code></a> respectively).
- 
- <h3>Multithreaded Processing</h3>
- 
- The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application
- making the call. We have intentionally avoided implementing a particular threading model for processing.
- <p>
- This leaves several options for implementing multi-threaded processing of records.
- 
+
+ <h4>Consumption Flow Control</h4>
+
+ If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time,
+ effectively giving these partitions the same priority for consumption. However in some cases consumers may want to
+ first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions
+ when these partitions have few or no data to consume.
+
+ <p>
+ One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams.
+ When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic
+ in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are
+ a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider
+ fetching other topics.
+
+ <p>
+ Kafka supports dynamic controlling of consumption flows by using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(org.apache.kafka.common.TopicPartition...)"><code>pause(TopicPartition...)</code></a> and <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(org.apache.kafka.common.TopicPartition...)"><code>resume(TopicPartition...)</code></a>
+ to pause the consumption on the specified assigned partitions and resume the consumption
+ on the specified paused partitions respectively in the future <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(long)</code></a> calls.
+
+ <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
+
+ The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
+ making the call. It is the responsibility of the user to ensure that multi-threaded access
+ is properly synchronized. Un-synchronized access will result in <code>ConcurrentModificationException</code>.
+
+ <p>
+ The only exception to this rule is <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup()"><code>wakeup()</code></a>, which can safely be used from an external thread to
+ interrupt an active operation. In this case, a <a href="../../../../../org/apache/kafka/common/errors/WakeupException.html" title="class in org.apache.kafka.common.errors"><code>WakeupException</code></a> will be
+ thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread.
+ The following snippet shows the typical pattern:
+
+ <pre>
+ public class KafkaConsumerRunner implements Runnable {
+     private final AtomicBoolean closed = new AtomicBoolean(false);
+     private final KafkaConsumer consumer;
+
+     public void run() {
+         try {
+             consumer.subscribe("topic");
+             while (!closed.get()) {
+                 ConsumerRecords records = consumer.poll(10000);
+                 // Handle new records
+             }
+         } catch (WakeupException e) {
+             // Ignore exception if closing
+             if (!closed.get()) throw e;
+         } finally {
+             consumer.close();
+         }
+     }
+
+     // Shutdown hook which can be called from a separate thread
+     public void shutdown() {
+         closed.set(true);
+         consumer.wakeup();
+     }
+ }
+ </pre>
+
+ Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
+
+ <p>
+ <pre>
+     closed.set(true);
+     consumer.wakeup();
+ </pre>
+
+ <p>
+ We have intentionally avoided implementing a particular threading model for processing. This leaves several
+ options for implementing multi-threaded processing of records.
+
+
  <h4>1. One Consumer Per Thread</h4>
- 
- A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
+
+ A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:
  <ul>
  <li><b>PRO</b>: It is the easiest to implement
  <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed
@@ -368,13 +445,13 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  which can cause some drop in I/O throughput.
  <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
  </ul>
- 
+
  <h4>2. Decouple Consumption and Processing</h4>
- 
+
  Another alternative is to have one or more consumer threads that do all data consumption and hands off
  <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerRecords</code></a> instances to a blocking queue consumed by a pool of processor threads that actually handle
  the record processing.
- 
+
  This option likewise has pros and cons:
  <ul>
  <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
@@ -385,8 +462,8 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
  that processing is complete for that partition.
  </ul>
- 
- There are many possible variations on this approach. For example each processor thread can have it's own queue, and
+
+ There are many possible variations on this approach. For example each processor thread can have its own queue, and
  the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
  commit.</div>
 </li>
@@ -412,12 +489,11 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 </td>
 </tr>
 <tr class="rowColor">
-<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Map, org.apache.kafka.clients.consumer.ConsumerRebalanceCallback, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(java.util.Map&lt;java.lang.String,java.lang.Object&gt;&nbsp;configs,
-             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceCallback</a>&nbsp;callback,
+<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Map,%20org.apache.kafka.common.serialization.Deserializer,%20org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(java.util.Map&lt;java.lang.String,java.lang.Object&gt;&nbsp;configs,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>&gt;&nbsp;keyDeserializer,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;&nbsp;valueDeserializer)</code>
 <div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration, a
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div>
+ <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div>
 </td>
 </tr>
 <tr class="altColor">
@@ -426,12 +502,11 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 </td>
 </tr>
 <tr class="rowColor">
-<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Properties, org.apache.kafka.clients.consumer.ConsumerRebalanceCallback, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(java.util.Properties&nbsp;properties,
-             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceCallback</a>&nbsp;callback,
+<td class="colOne"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#KafkaConsumer(java.util.Properties,%20org.apache.kafka.common.serialization.Deserializer,%20org.apache.kafka.common.serialization.Deserializer)">KafkaConsumer</a></strong>(java.util.Properties&nbsp;properties,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>&gt;&nbsp;keyDeserializer,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;&nbsp;valueDeserializer)</code>
 <div class="block">A consumer is instantiated by providing a <code>Properties</code> object as configuration and a
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div>
+ <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.</div>
 </td>
 </tr>
 </table>
@@ -451,28 +526,66 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 </tr>
 <tr class="altColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#close()">close</a></strong>()</code>&nbsp;</td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)">assign</a></strong>(java.util.List&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;&nbsp;partitions)</code>
+<div class="block">Manually assign a list of partition to this consumer.</div>
+</td>
 </tr>
 <tr class="rowColor">
+<td class="colFirst"><code>java.util.Set&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assignment()">assignment</a></strong>()</code>
+<div class="block">Get the set of partitions currently assigned to this consumer.</div>
+</td>
+</tr>
+<tr class="altColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commit(org.apache.kafka.clients.consumer.CommitType)">commit</a></strong>(<a href="../../../../../org/apache/kafka/clients/consumer/CommitType.html" title="enum in org.apache.kafka.clients.consumer">CommitType</a>&nbsp;commitType)</code>
-<div class="block">Commits offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for the subscribed list of topics and partitions.</div>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#close()">close</a></strong>()</code>
+<div class="block">Close the consumer, waiting indefinitely for any needed cleanup.</div>
+</td>
+</tr>
+<tr class="rowColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync()">commitAsync</a></strong>()</code>
+<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for all the subscribed list of topics and partition.</div>
 </td>
 </tr>
 <tr class="altColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commit(java.util.Map, org.apache.kafka.clients.consumer.CommitType)">commit</a></strong>(java.util.Map&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,java.lang.Long&gt;&nbsp;offsets,
-      <a href="../../../../../org/apache/kafka/clients/consumer/CommitType.html" title="enum in org.apache.kafka.clients.consumer">CommitType</a>&nbsp;commitType)</code>
-<div class="block">Commits the specified offsets for the specified list of topics and partitions to Kafka.</div>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)">commitAsync</a></strong>(java.util.Map&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a>&gt;&nbsp;offsets,
+           <a href="../../../../../org/apache/kafka/clients/consumer/OffsetCommitCallback.html" title="interface in org.apache.kafka.clients.consumer">OffsetCommitCallback</a>&nbsp;callback)</code>
+<div class="block">Commit the specified offsets for the specified list of topics and partitions to Kafka.</div>
 </td>
 </tr>
 <tr class="rowColor">
-<td class="colFirst"><code>long</code></td>
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(org.apache.kafka.clients.consumer.OffsetCommitCallback)">commitAsync</a></strong>(<a href="../../../../../org/apache/kafka/clients/consumer/OffsetCommitCallback.html" title="interface in org.apache.kafka.clients.consumer">OffsetCommitCallback</a>&nbsp;callback)</code>
+<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for the subscribed list of topics and partitions.</div>
+</td>
+</tr>
+<tr class="altColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()">commitSync</a></strong>()</code>
+<div class="block">Commit offsets returned on the last <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> for all the subscribed list of topics and partitions.</div>
+</td>
+</tr>
+<tr class="rowColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)">commitSync</a></strong>(java.util.Map&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>,<a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a>&gt;&nbsp;offsets)</code>
+<div class="block">Commit the specified offsets for the specified list of topics and partitions.</div>
+</td>
+</tr>
+<tr class="altColor">
+<td class="colFirst"><code><a href="../../../../../org/apache/kafka/clients/consumer/OffsetAndMetadata.html" title="class in org.apache.kafka.clients.consumer">OffsetAndMetadata</a></code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#committed(org.apache.kafka.common.TopicPartition)">committed</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&nbsp;partition)</code>
-<div class="block">Fetches the last committed offset for the given partition (whether the commit happened by this process or
+<div class="block">Get the last committed offset for the given partition (whether the commit happened by this process or
  another).</div>
 </td>
 </tr>
+<tr class="rowColor">
+<td class="colFirst"><code>java.util.Map&lt;java.lang.String,java.util.List&lt;<a href="../../../../../org/apache/kafka/common/PartitionInfo.html" title="class in org.apache.kafka.common">PartitionInfo</a>&gt;&gt;</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics()">listTopics</a></strong>()</code>
+<div class="block">Get metadata about partitions for all topics that the user is authorized to view.</div>
+</td>
+</tr>
 <tr class="altColor">
 <td class="colFirst"><code>java.util.Map&lt;<a href="../../../../../org/apache/kafka/common/MetricName.html" title="class in org.apache.kafka.common">MetricName</a>,? extends <a href="../../../../../org/apache/kafka/common/Metric.html" title="interface in org.apache.kafka.common">Metric</a>&gt;</code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#metrics()">metrics</a></strong>()</code>
@@ -486,20 +599,32 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 </td>
 </tr>
 <tr class="altColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(org.apache.kafka.common.TopicPartition...)">pause</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
+<div class="block">Suspend fetching from the requested partitions.</div>
+</td>
+</tr>
+<tr class="rowColor">
 <td class="colFirst"><code><a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer">ConsumerRecords</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">poll</a></strong>(long&nbsp;timeout)</code>
-<div class="block">Fetches data for the topics or partitions specified using one of the subscribe APIs.</div>
+<div class="block">Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.</div>
 </td>
 </tr>
-<tr class="rowColor">
+<tr class="altColor">
 <td class="colFirst"><code>long</code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#position(org.apache.kafka.common.TopicPartition)">position</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&nbsp;partition)</code>
-<div class="block">Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).</div>
+<div class="block">Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).</div>
+</td>
+</tr>
+<tr class="rowColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(org.apache.kafka.common.TopicPartition...)">resume</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
+<div class="block">Resume specified partitions which have been paused with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(org.apache.kafka.common.TopicPartition...)"><code>pause(TopicPartition...)</code></a>.</div>
 </td>
 </tr>
 <tr class="altColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition, long)">seek</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&nbsp;partition,
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)">seek</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&nbsp;partition,
     long&nbsp;offset)</code>
 <div class="block">Overrides the fetch offsets that the consumer will use on the next <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll(timeout)</code></a>.</div>
 </td>
@@ -507,44 +632,52 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 <tr class="rowColor">
 <td class="colFirst"><code>void</code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(org.apache.kafka.common.TopicPartition...)">seekToBeginning</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
-<div class="block">Seek to the first offset for each of the given partitions</div>
+<div class="block">Seek to the first offset for each of the given partitions.</div>
 </td>
 </tr>
 <tr class="altColor">
 <td class="colFirst"><code>void</code></td>
 <td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToEnd(org.apache.kafka.common.TopicPartition...)">seekToEnd</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
-<div class="block">Seek to the last offset for each of the given partitions</div>
+<div class="block">Seek to the last offset for each of the given partitions.</div>
 </td>
 </tr>
 <tr class="rowColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.lang.String...)">subscribe</a></strong>(java.lang.String...&nbsp;topics)</code>
-<div class="block">Incrementally subscribes to the given list of topics and uses the consumer's group management functionality</div>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List)">subscribe</a></strong>(java.util.List&lt;java.lang.String&gt;&nbsp;topics)</code>
+<div class="block">Subscribe to the given list of topics to get dynamically assigned partitions.</div>
 </td>
 </tr>
 <tr class="altColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(org.apache.kafka.common.TopicPartition...)">subscribe</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
-<div class="block">Incrementally subscribes to a specific topic partition and does not use the consumer's group management
- functionality.</div>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></strong>(java.util.List&lt;java.lang.String&gt;&nbsp;topics,
+         <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a>&nbsp;listener)</code>
+<div class="block">Subscribe to the given list of topics to get dynamically
+ assigned partitions.</div>
 </td>
 </tr>
 <tr class="rowColor">
-<td class="colFirst"><code>java.util.Set&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscriptions()">subscriptions</a></strong>()</code>
-<div class="block">The set of partitions currently assigned to this consumer.</div>
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></strong>(java.util.regex.Pattern&nbsp;pattern,
+         <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a>&nbsp;listener)</code>
+<div class="block">Subscribe to all topics matching specified pattern to get dynamically assigned partitions.</div>
 </td>
 </tr>
 <tr class="altColor">
-<td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe(java.lang.String...)">unsubscribe</a></strong>(java.lang.String...&nbsp;topics)</code>
-<div class="block">Unsubscribe from the specific topics.</div>
+<td class="colFirst"><code>java.util.Set&lt;java.lang.String&gt;</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscription()">subscription</a></strong>()</code>
+<div class="block">Get the current subscription.</div>
 </td>
 </tr>
 <tr class="rowColor">
 <td class="colFirst"><code>void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe(org.apache.kafka.common.TopicPartition...)">unsubscribe</a></strong>(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</code>
-<div class="block">Unsubscribe from the specific topic partitions.</div>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()">unsubscribe</a></strong>()</code>
+<div class="block">Unsubscribe from topics currently subscribed with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List)"><code>subscribe(List)</code></a>.</div>
+</td>
+</tr>
+<tr class="altColor">
+<td class="colFirst"><code>void</code></td>
+<td class="colLast"><code><strong><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup()">wakeup</a></strong>()</code>
+<div class="block">Wakeup the consumer.</div>
 </td>
 </tr>
 </table>
@@ -585,22 +718,20 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 <dl><dt><span class="strong">Parameters:</span></dt><dd><code>configs</code> - The consumer configs</dd></dl>
 </li>
 </ul>
-<a name="KafkaConsumer(java.util.Map, org.apache.kafka.clients.consumer.ConsumerRebalanceCallback, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">
+<a name="KafkaConsumer(java.util.Map, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
 <h4>KafkaConsumer</h4>
 <pre>public&nbsp;KafkaConsumer(java.util.Map&lt;java.lang.String,java.lang.Object&gt;&nbsp;configs,
-             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceCallback</a>&nbsp;callback,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>&gt;&nbsp;keyDeserializer,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;&nbsp;valueDeserializer)</pre>
 <div class="block">A consumer is instantiated by providing a set of key-value pairs as configuration, a
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.
+ <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.
  <p>
  Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div>
-<dl><dt><span class="strong">Parameters:</span></dt><dd><code>configs</code> - The consumer configs</dd><dd><code>callback</code> - A callback interface that the user can implement to manage customized offsets on the start and
-            end of every rebalance operation.</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
+<dl><dt><span class="strong">Parameters:</span></dt><dd><code>configs</code> - The consumer configs</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
             won't be called in the consumer when the deserializer is passed in directly.</dd><dd><code>valueDeserializer</code> - The deserializer for value that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
             won't be called in the consumer when the deserializer is passed in directly.</dd></dl>
 </li>
@@ -618,22 +749,20 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div>
 </li>
 </ul>
-<a name="KafkaConsumer(java.util.Properties, org.apache.kafka.clients.consumer.ConsumerRebalanceCallback, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">
+<a name="KafkaConsumer(java.util.Properties, org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer)">
 <!--   -->
 </a>
 <ul class="blockListLast">
 <li class="blockList">
 <h4>KafkaConsumer</h4>
 <pre>public&nbsp;KafkaConsumer(java.util.Properties&nbsp;properties,
-             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceCallback</a>&nbsp;callback,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>&gt;&nbsp;keyDeserializer,
              <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization">Deserializer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;&nbsp;valueDeserializer)</pre>
 <div class="block">A consumer is instantiated by providing a <code>Properties</code> object as configuration and a
- <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceCallback</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.
+ <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> implementation, a key and a value <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.
  <p>
  Valid configuration strings are documented at <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerConfig.html" title="class in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a></div>
-<dl><dt><span class="strong">Parameters:</span></dt><dd><code>properties</code> - The consumer configuration properties</dd><dd><code>callback</code> - A callback interface that the user can implement to manage customized offsets on the start and
-            end of every rebalance operation.</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
+<dl><dt><span class="strong">Parameters:</span></dt><dd><code>properties</code> - The consumer configuration properties</dd><dd><code>keyDeserializer</code> - The deserializer for key that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
             won't be called in the consumer when the deserializer is passed in directly.</dd><dd><code>valueDeserializer</code> - The deserializer for value that implements <a href="../../../../../org/apache/kafka/common/serialization/Deserializer.html" title="interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>. The configure() method
             won't be called in the consumer when the deserializer is passed in directly.</dd></dl>
 </li>
@@ -646,32 +775,54 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 <!--   -->
 </a>
 <h3>Method Detail</h3>
-<a name="subscriptions()">
+<a name="assignment()">
+<!--   -->
+</a>
+<ul class="blockList">
+<li class="blockList">
+<h4>assignment</h4>
+<pre>public&nbsp;java.util.Set&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;&nbsp;assignment()</pre>
+<div class="block">Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
+ partitions using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a> then this will simply return the same partitions that
+ were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
+ to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
+ process of getting reassigned).</div>
+<dl>
+<dt><strong>Specified by:</strong></dt>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#assignment()">assignment</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Returns:</span></dt><dd>The set of partitions currently assigned to this consumer</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assignment()"><code>assignment()</code></a></dd></dl>
+</li>
+</ul>
+<a name="subscription()">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
-<h4>subscriptions</h4>
-<pre>public&nbsp;java.util.Set&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;&nbsp;subscriptions()</pre>
-<div class="block">The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
- partitions using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(org.apache.kafka.common.TopicPartition...)"><code>subscribe(TopicPartition...)</code></a> then this will simply return the list of partitions that
- were subscribed to. If subscription was done by specifying only the topic using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.lang.String...)"><code>subscribe(String...)</code></a>
- then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
- hasn't happened yet, or the partitions are in the process of getting reassigned).</div>
+<h4>subscription</h4>
+<pre>public&nbsp;java.util.Set&lt;java.lang.String&gt;&nbsp;subscription()</pre>
+<div class="block">Get the current subscription. Will return the same topics used in the most recent call to
+ <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>, or an empty set if no such call has been made.</div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
-<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscriptions()">subscriptions</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscriptions()"><code>subscriptions()</code></a></dd></dl>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscription()">subscription</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Returns:</span></dt><dd>The set of topics currently subscribed to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscription()"><code>subscription()</code></a></dd></dl>
 </li>
 </ul>
-<a name="subscribe(java.lang.String...)">
+<a name="subscribe(java.util.List, org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
 <h4>subscribe</h4>
-<pre>public&nbsp;void&nbsp;subscribe(java.lang.String...&nbsp;topics)</pre>
-<div class="block">Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+<pre>public&nbsp;void&nbsp;subscribe(java.util.List&lt;java.lang.String&gt;&nbsp;topics,
+             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a>&nbsp;listener)</pre>
+<div class="block">Subscribe to the given list of topics to get dynamically
+ assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
+ assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
+ with manual partition assignment through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a>.
+
+ If the given list of topics is empty, it is treated the same as <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()"><code>unsubscribe()</code></a>.
+
  <p>
  As part of group management, the consumer will keep track of the list of consumers that belong to a particular
  group and will trigger a rebalance operation if one of the following events trigger -
@@ -680,58 +831,105 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
  <li>Topic is created or deleted
  <li>An existing member of the consumer group dies
  <li>A new member is added to an existing consumer group via the join API
- </ul></div>
+ </ul>
+ <p>
+ When any of these events are triggered, the provided listener will be invoked first to indicate that
+ the consumer's assignment has been revoked, and then again when the new assignment has been received.
+ Note that this listener will immediately override any listener set in a previous call to subscribe.
+ It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics
+ subscribed in this call. See <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer"><code>ConsumerRebalanceListener</code></a> for more details.</div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
-<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.lang.String...)">subscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - A variable list of topics that the consumer wants to subscribe to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.lang.String...)"><code>subscribe(String...)</code></a></dd></dl>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - The list of topics to subscribe to</dd><dd><code>listener</code> - Non-null listener instance to get notifications on partition assignment/revocation for the
+                 subscribed topics</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a></dd></dl>
 </li>
 </ul>
-<a name="subscribe(org.apache.kafka.common.TopicPartition...)">
+<a name="subscribe(java.util.List)">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
 <h4>subscribe</h4>
-<pre>public&nbsp;void&nbsp;subscribe(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</pre>
-<div class="block">Incrementally subscribes to a specific topic partition and does not use the consumer's group management
- functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
- metadata change.
- <p></div>
+<pre>public&nbsp;void&nbsp;subscribe(java.util.List&lt;java.lang.String&gt;&nbsp;topics)</pre>
+<div class="block">Subscribe to the given list of topics to get dynamically assigned partitions.
+ <b>Topic subscriptions are not incremental. This list will replace the current
+ assignment (if there is one).</b> It is not possible to combine topic subscription with group management
+ with manual partition assignment through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a>.
+
+ If the given list of topics is empty, it is treated the same as <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()"><code>unsubscribe()</code></a>.
+
+ <p>
+ This is a short-hand for <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>, which
+ uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
+ <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>, since group rebalances will cause partition offsets
+ to be reset. You should also prefer to provide your own listener if you are doing your own offset
+ management since the listener gives you an opportunity to commit offsets before a rebalance finishes.</div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
-<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(org.apache.kafka.common.TopicPartition...)">subscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">Parameters:</span></dt><dd><code>partitions</code> - Partitions to incrementally subscribe to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(org.apache.kafka.common.TopicPartition...)"><code>subscribe(TopicPartition...)</code></a></dd></dl>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.List)">subscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - The list of topics to subscribe to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List)"><code>subscribe(List)</code></a></dd></dl>
 </li>
 </ul>
-<a name="unsubscribe(java.lang.String...)">
+<a name="subscribe(java.util.regex.Pattern, org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
-<h4>unsubscribe</h4>
-<pre>public&nbsp;void&nbsp;unsubscribe(java.lang.String...&nbsp;topics)</pre>
-<div class="block">Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
- be returned from the next <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> onwards</div>
+<h4>subscribe</h4>
+<pre>public&nbsp;void&nbsp;subscribe(java.util.regex.Pattern&nbsp;pattern,
+             <a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html" title="interface in org.apache.kafka.clients.consumer">ConsumerRebalanceListener</a>&nbsp;listener)</pre>
+<div class="block">Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics
+ existing at the time of check.
+ <p>
+ As part of group management, the consumer will keep track of the list of consumers that
+ belong to a particular group and will trigger a rebalance operation if one of the
+ following events trigger -
+ <ul>
+ <li>Number of partitions change for any of the subscribed list of topics
+ <li>Topic is created or deleted
+ <li>An existing member of the consumer group dies
+ <li>A new member is added to an existing consumer group via the join API
+ </ul></div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
-<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#unsubscribe(java.lang.String...)">unsubscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">Parameters:</span></dt><dd><code>topics</code> - Topics to unsubscribe from</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe(java.lang.String...)"><code>unsubscribe(String...)</code></a></dd></dl>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)">subscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Parameters:</span></dt><dd><code>pattern</code> - Pattern to subscribe to</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(Pattern, ConsumerRebalanceListener)</code></a></dd></dl>
 </li>
 </ul>
-<a name="unsubscribe(org.apache.kafka.common.TopicPartition...)">
+<a name="unsubscribe()">
 <!--   -->
 </a>
 <ul class="blockList">
 <li class="blockList">
 <h4>unsubscribe</h4>
-<pre>public&nbsp;void&nbsp;unsubscribe(<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>...&nbsp;partitions)</pre>
-<div class="block">Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
- <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)"><code>poll()</code></a> onwards</div>
+<pre>public&nbsp;void&nbsp;unsubscribe()</pre>
+<div class="block">Unsubscribe from topics currently subscribed with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List)"><code>subscribe(List)</code></a>. This
+ also clears any partitions directly assigned through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a>.</div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
-<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#unsubscribe(org.apache.kafka.common.TopicPartition...)">unsubscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">Parameters:</span></dt><dd><code>partitions</code> - Partitions to unsubscribe from</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe(org.apache.kafka.common.TopicPartition...)"><code>unsubscribe(TopicPartition...)</code></a></dd></dl>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#unsubscribe()">unsubscribe</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#unsubscribe()"><code>unsubscribe()</code></a></dd></dl>
+</li>
+</ul>
+<a name="assign(java.util.List)">
+<!--   -->
+</a>
+<ul class="blockList">
+<li class="blockList">
+<h4>assign</h4>
+<pre>public&nbsp;void&nbsp;assign(java.util.List&lt;<a href="../../../../../org/apache/kafka/common/TopicPartition.html" title="class in org.apache.kafka.common">TopicPartition</a>&gt;&nbsp;partitions)</pre>
+<div class="block">Manually assign a list of partition to this consumer. This interface does not allow for incremental assignment
+ and will replace the previous assignment (if there is one).
+ <p>
+ Manual topic assignment through this method does not use the consumer's group management
+ functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+ metadata change. Note that it is not possible to use both manual partition assignment with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a>
+ and group assignment with <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)"><code>subscribe(List, ConsumerRebalanceListener)</code></a>.</div>
+<dl>
+<dt><strong>Specified by:</strong></dt>
+<dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#assign(java.util.List)">assign</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
+<dt><span class="strong">Parameters:</span></dt><dd><code>partitions</code> - The list of partitions to assign this consumer</dd><dt><span class="strong">See Also:</span></dt><dd><a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)"><code>assign(List)</code></a></dd></dl>
 </li>
 </ul>
 <a name="poll(long)">
@@ -741,63 +939,147 @@ implements <a href="../../../../../org/apache/kafka/clients/consumer/Consumer.ht
 <li class="blockList">
 <h4>poll</h4>
 <pre>public&nbsp;<a href="../../../../../org/apache/kafka/clients/consumer/ConsumerRecords.html" title="class in org.apache.kafka.clients.consumer">ConsumerRecords</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;&nbsp;poll(long&nbsp;timeout)</pre>
-<div class="block">Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+<div class="block">Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
  subscribed to any topics or partitions before polling for data.
  <p>
- The offset used for fetching the data is governed by whether or not <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition, long)"><code>seek(TopicPartition, long)</code></a> is used.
- If <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition, long)"><code>seek(TopicPartition, long)</code></a> is used, it will use the specified offsets on startup and on every
- rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
- offset using <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#commit(java.util.Map, org.apache.kafka.clients.consumer.CommitType)"><code>commit(offsets, sync)</code></a> for the subscribed list of partitions.</div>
+ On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
+ consumed offset can be manually set through <a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)"><code>seek(TopicPartition, long)</code></a> or automatically set as the last committed
+ offset for the subscribed list of partitions</div>
 <dl>
 <dt><strong>Specified by:</strong></dt>
 <dd><code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html#poll(long)">poll</a></code>&nbsp;in interface&nbsp;<code><a href="../../../../../org/apache/kafka/clients/consumer/Consumer.html" title="interface in org.apache.kafka.clients.consumer">Consumer</a>&lt;<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">K</a>,<a href="../../../../../org/apache/kafka/clients/consumer/KafkaConsumer.html" title="type parameter in KafkaConsumer">V</a>&gt;</code></dd>
-<dt><span class="strong">Parameters:</span></dt><dd><code>timeout</code> - The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
-            indefinitely. Must not be negative</dd>
+<dt><span class="strong">Parameters:</span></dt><dd><code>timeout</code> - The time, in milliseconds, spent waiting in

<TRUNCATED>

Mime
View raw message