kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9053: AssignmentInfo#encode hardcodes the LATEST_SUPPORTED_VERSION (#7537)
Date Thu, 17 Oct 2019 05:24:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 8c1d33f  KAFKA-9053: AssignmentInfo#encode hardcodes the LATEST_SUPPORTED_VERSION
(#7537)
8c1d33f is described below

commit 8c1d33fba1b2ab22818e0a91d9758dadecb20118
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Oct 16 22:23:15 2019 -0700

    KAFKA-9053: AssignmentInfo#encode hardcodes the LATEST_SUPPORTED_VERSION (#7537)
    
    Also put in some additional logging that makes sense to add, and proved helpful in debugging
this particular issue.
    
    Unit tests verifying the encoded supported version were added.
    
    This should get cherry-picked back to 2.1
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 22 ++++++++++++++++++----
 .../internals/assignment/AssignmentInfo.java       | 22 +++++++++++-----------
 .../internals/assignment/SubscriptionInfo.java     |  4 ++--
 .../internals/assignment/AssignmentInfoTest.java   |  9 +++++++++
 .../internals/assignment/SubscriptionInfoTest.java | 17 +++++++++++++++++
 5 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 8b2c95a..78ee40c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -383,10 +383,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         }
 
         if (minReceivedMetadataVersion < LATEST_SUPPORTED_VERSION) {
-            log.info("Downgrading metadata to version {}. Latest supported version is {}.",
+            log.info("Downgrade metadata to version {}. Latest supported version is {}.",
                 minReceivedMetadataVersion,
                 LATEST_SUPPORTED_VERSION);
         }
+        if (minSupportedMetadataVersion < LATEST_SUPPORTED_VERSION) {
+            log.info("Downgrade latest supported metadata to version {}. Latest supported
version is {}.",
+                minSupportedMetadataVersion,
+                LATEST_SUPPORTED_VERSION);
+        }
 
         log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
 
@@ -1055,9 +1060,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 log.info(
                     "Sent a version {} subscription and got version {} assignment back (successful
version probing). "
                         +
-                        "Downgrade subscription metadata to commonly supported version and
trigger new rebalance.",
+                        "Downgrade subscription metadata to commonly supported version {}
and trigger new rebalance.",
                     usedSubscriptionMetadataVersion,
-                    receivedAssignmentMetadataVersion
+                    receivedAssignmentMetadataVersion,
+                    latestCommonlySupportedVersion
                 );
                 usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                 return true;
@@ -1237,7 +1243,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     }
 
     private int updateMinSupportedVersion(final int supportedVersion, final int minSupportedMetadataVersion)
{
-        return supportedVersion < minSupportedMetadataVersion ? supportedVersion : minSupportedMetadataVersion;
+        if (supportedVersion < minSupportedMetadataVersion) {
+            log.debug("Downgrade the current minimum supported version {} to the smaller
seen supported version {}",
+                minSupportedMetadataVersion, supportedVersion);
+            return supportedVersion;
+        } else {
+            log.debug("Current minimum supported version remains at {}, last seen supported
version was {}",
+                minSupportedMetadataVersion, supportedVersion);
+            return minSupportedMetadataVersion;
+        }
     }
 
     protected void setAssignmentErrorCode(final Integer errorCode) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index dcaaa9a..a36391e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -145,7 +145,7 @@ public class AssignmentInfo {
                     break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: " + usedVersion
-                        + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+                        + "; latest commonly supported version: " + commonlySupportedVersion);
             }
 
             out.flush();
@@ -245,14 +245,14 @@ public class AssignmentInfo {
 
     private void encodeVersionThree(final DataOutputStream out) throws IOException {
         out.writeInt(3);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
     }
 
     private void encodeVersionFour(final DataOutputStream out) throws IOException {
         out.writeInt(4);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
         out.writeInt(errCode);
@@ -260,7 +260,7 @@ public class AssignmentInfo {
 
     private void encodeVersionFive(final DataOutputStream out) throws IOException {
         out.writeInt(5);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHostAsDictionary(out);
         out.writeInt(errCode);
@@ -277,7 +277,7 @@ public class AssignmentInfo {
             final AssignmentInfo assignmentInfo;
 
             final int usedVersion = in.readInt();
-            final int latestSupportedVersion;
+            final int commonlySupportedVersion;
             switch (usedVersion) {
                 case 1:
                     assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -288,18 +288,18 @@ public class AssignmentInfo {
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
                 case 3:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
                 case 4:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionFourData(assignmentInfo, in);
                     break;
                 case 5:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionFiveData(assignmentInfo, in);
                     break;
                 default:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 81430ff..20f2ce0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -212,7 +212,7 @@ public class SubscriptionInfo {
         final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes));
 
         buf.putInt(usedVersion); // used version
-        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        buf.putInt(latestSupportedVersion); // supported version
         encodeClientUUID(buf);
         encodeTasks(buf, prevTasks);
         encodeTasks(buf, standbyTasks);
@@ -272,7 +272,7 @@ public class SubscriptionInfo {
             default:
                 latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
-                log.info("Unable to decode subscription data: used version: {}; latest supported
version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
+                log.info("Unable to decode subscription data: used version: {}; latest supported
version: {}", usedVersion, latestSupportedVersion);
         }
 
         return subscriptionInfo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index b65994e..e9f0237 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -103,4 +103,13 @@ public class AssignmentInfoTest {
         final AssignmentInfo expectedInfo = new AssignmentInfo(5, LATEST_SUPPORTED_VERSION,
activeTasks, standbyTasks, globalAssignment, 2);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
+
+    @Test
+    public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
+        final int usedVersion = LATEST_SUPPORTED_VERSION - 1;
+        final int commonlySupportedVersion = LATEST_SUPPORTED_VERSION - 1;
+        final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion,
activeTasks, standbyTasks, globalAssignment, 2);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion,
activeTasks, standbyTasks, globalAssignment, 2);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index a6150c0..a0806a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -86,12 +86,29 @@ public class SubscriptionInfoTest {
     }
 
     @Test
+    public void shouldEncodeAndDecodeVersion5() {
+        final SubscriptionInfo info = new SubscriptionInfo(5, processId, activeTasks, standbyTasks,
"localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION,
processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
+    @Test
     public void shouldAllowToDecodeFutureSupportedVersion() {
         final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion());
         assertEquals(LATEST_SUPPORTED_VERSION + 1, info.version());
         assertEquals(LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
     }
 
+    @Test
+    public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
+        final int usedVersion = LATEST_SUPPORTED_VERSION - 1;
+        final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
+
+        final SubscriptionInfo info = new SubscriptionInfo(usedVersion, latestSupportedVersion,
processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion,
processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
     private ByteBuffer encodeFutureVersion() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
                                                    + 4 /* supported version */);


Mime
View raw message