kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: Explain the separate upgrade paths for consumer groups and Streams (#7516)
Date Wed, 16 Oct 2019 23:12:31 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/2.4 by this push:
     new ee4f65d  Explain the separate  upgrade paths for consumer groups and Streams (#7516)
ee4f65d is described below

commit ee4f65dfa5bd2a14e17805734be3d9e1fdfc25a2
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Oct 16 16:09:09 2019 -0700

    Explain the separate  upgrade paths for consumer groups and Streams (#7516)
    Document the upgrade path for the consumer and for Streams (note that they differ significantly).
    Needs to be cherry-picked to 2.4
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
 docs/streams/upgrade-guide.html | 17 +++++++++++------
 docs/upgrade.html               | 25 +++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1cc5b50..2de7917 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,10 +34,10 @@
-        Upgrading from any older version to {{fullDotVersion}} is possible: (1) if you are
upgrading from 2.0.x to {{fullDotVersion}} then a single rolling bounce is needed to swap
in the new jar,
-        (2) if you are upgrading from older versions than 2.0.x in the online mode, you would
need two rolling bounces where
-        the first rolling bounce phase you need to set config <code>upgrade.from="older
version"</code> (possible values are <code>"0.10.0", "0.10.1", "0.10.2", "0.11.0",
"1.0", and "1.1"</code>)
-        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>):
+        Upgrading from any older version to {{fullDotVersion}} is possible: you will need
to do two rolling bounces, where during the first rolling bounce phase you set the config
<code>upgrade.from="older version"</code>
+        (possible values are <code>"0.10.0" - "2.3"</code>) and during the second
you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol
of the embedded consumer. Note that you will remain using the old eager
+        rebalancing protocol if you skip or delay the second rolling bounce, but you can
safely switch over to cooperative at any time once the entire group is on 2.4+ by removing
the config value and bouncing. For more details please refer to
+        <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>:
         <li> prepare your application instances for a rolling bounce and make sure
that config <code>upgrade.from</code> is set to the version from which it is being
@@ -75,11 +75,16 @@
     <h3><a id="streams_api_changes_240" href="#streams_api_changes_240">Streams
API changes in 2.4.0</a></h3>
     <!-- Placeholder KIP-213 -->
     <!-- Placeholder KIP-307 -->
     <!-- Placeholder KIP-479 -->
-    <!-- Placeholder KIP-429 -->
+    <p>
+        With the introduction of incremental cooperative rebalancing, Streams no longer requires
all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance
only those tasks which are to be migrated to another consumer
+        for overall load balance will need to be closed and revoked. This changes the semantics
of the <code>StateListener</code> a bit, as it will not necessarily transition
to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
+        this means IQ will now be available at all times except during state restoration,
including while a rebalance is in progress. If restoration is occurring when a rebalance begins,
we will continue to actively restore the state stores and/or process
+        standby tasks during a cooperative rebalance. Note that with this new rebalancing
protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures
all tasks are safely distributed. For details on please see
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol">KIP-429</a>.
+    </p>
         The 2.4.0 release contains newly added and reworked metrics.
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0c69336..6d9f702 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -45,6 +45,31 @@
         The old overloaded functions are deprecated and we would recommend users to make
their code changes to leverage the new methods (details
         can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions">KIP-520</a>).
+    <li>We are introducing incremental cooperative rebalancing to the clients' group
protocol, which allows consumers to keep all of their assigned partitions during a rebalance
+        and at the end revoke only those which must be migrated to another consumer for overall
cluster balance. The <code>ConsumerCoordinator</code> will choose the latest <code>RebalanceProtocol</code>
+        that is commonly supported by all of the consumer's supported assignors. You can
use the new built-in <code>CooperativeStickyAssignor</code> or plug in your own
custom cooperative assignor. To do
+        so you must implement the <code>ConsumerPartitionAssignor</code> interface
and include <code>RebalanceProtocol.COOPERATIVE</code> in the list returned by
+        Your custom assignor can then leverage the <code>ownedPartitions</code>
field in each consumer's <code>Subscription</code> to give partitions back to
their previous owners whenever possible. Note that when
+        a partition is to be reassigned to another consumer, it <em>must</em>
be removed from the new assignment until it has been revoked from its original owner. Any
consumer that has to revoke a partition will trigger
+        a followup rebalance to allow the revoked partition to safely be assigned to its
new owner. See the
+        <a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html">ConsumerPartitionAssignor
RebalanceProtocol javadocs</a> for more information.
+        <br>
+        To upgrade from the old (eager) protocol, which always revokes all partitions before
rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all
clients on the same <code>ConsumerPartitionAssignor</code>
+        that supports the cooperative protocol. This can be done with two rolling bounces,
using the <code>CooperativeStickyAssignor</code> for the example: during the first
one, add "cooperative-sticky" to the list of supported assignors
+        for each member (without removing the previous assignor -- note that if previously
using the default, you must include that explicitly as well). You then bounce and/or upgrade
+        Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among
their supported assignors, remove the other assignor(s) and perform a second rolling bounce
so that by the end all members support only the
+        cooperative protocol. For further details on the cooperative rebalancing protocol
and upgrade path, see <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>.
+    </li>
+    <li>There are some behavioral changes to the <code>ConsumerRebalanceListener</code>,
as well as a new API. Exceptions thrown during any of the listener's three callbacks will
no longer be swallowed, and will instead be re-thrown
+        all the way up to the <code>Consumer.poll()</code> call. The <code>onPartitionsLost</code>
method has been added to allow users to react to abnormal circumstances where a consumer may
have lost ownership of its partitions
+        (such as a missed rebalance) and cannot commit offsets. By default, this will simply
call the existing <code>onPartitionsRevoked</code> API to align with previous
behavior. Note however that <code>onPartitionsLost</code> will not
+        be called when the set of lost partitions is empty. This means that no callback will
be invoked at the beginning of the first rebalance of a new consumer joining the group.
+        <br>
+        The semantics of the <code>ConsumerRebalanceListener's</code> callbacks
are further changed when following the cooperative rebalancing protocol described above. In
addition to <code>onPartitionsLost</code>, <code>onPartitionsRevoked</code>
+        will also never be called when the set of revoked partitions is empty. The callback
will generally be invoked only at the end of a rebalance, and only on the set of partitions
that are being moved to another consumer. The
+        <code>onPartitionsAssigned</code> callback will however always be called,
even with an empty set of partitions, as a way to notify users of a rebalance event (this
is true for both cooperative and eager). For details on
+        the new callback semantics, see the <a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html">ConsumerRebalanceListener
+    </li>
 <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>

View raw message