kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.1 updated: Fix bug in AssignmentInfo#encode and add additional logging (#7545)
Date Thu, 17 Oct 2019 22:56:10 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 16e3c52  Fix bug in AssignmentInfo#encode and add additional logging (#7545)
16e3c52 is described below

commit 16e3c52f1d8b12106add2db87c6c622bbc932ac0
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu Oct 17 15:09:55 2019 -0700

    Fix bug in AssignmentInfo#encode and add additional logging (#7545)
    
    Same as #7537
    but targeted at 2.3 for cherry-pick
    Reviewers: Bill Bejeck <bbejeck@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 24 ++++++++++++++++------
 .../internals/assignment/AssignmentInfo.java       | 16 +++++++--------
 .../internals/assignment/SubscriptionInfo.java     |  9 ++++----
 .../internals/assignment/AssignmentInfoTest.java   |  9 ++++++++
 .../internals/assignment/SubscriptionInfoTest.java | 10 +++++++++
 5 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index f99866e..e98ae3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -416,9 +416,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 minReceivedMetadataVersion = usedVersion;
             }
 
-            final int latestSupportedVersion = info.latestSupportedVersion();
-            if (latestSupportedVersion < minSupportedMetadataVersion) {
-                minSupportedMetadataVersion = latestSupportedVersion;
+            final int supportedVersion = info.latestSupportedVersion();
+
+            if (supportedVersion < minSupportedMetadataVersion) {
+                log.debug("Downgrade the current minimum supported version {} to the smaller
seen supported version {}",
+                    minSupportedMetadataVersion, supportedVersion);
+                minSupportedMetadataVersion = supportedVersion;
+            } else {
+                log.debug("Current minimum supported version remains at {}, last seen supported
version was {}",
+                    minSupportedMetadataVersion, supportedVersion);
             }
 
             // create the new client metadata if necessary
@@ -449,10 +455,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor,
Configurable
         }
 
         if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
-            log.info("Downgrading metadata to version {}. Latest supported version is {}.",
+            log.info("Downgrade metadata to version {}. Latest supported version is {}.",
                 minReceivedMetadataVersion,
                 SubscriptionInfo.LATEST_SUPPORTED_VERSION);
         }
+        if (minSupportedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+            log.info("Downgrade latest supported metadata to version {}. Latest supported
version is {}.",
+                minSupportedMetadataVersion,
+                SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+        }
 
         log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
 
@@ -884,9 +895,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 log.info(
                     "Sent a version {} subscription and got version {} assignment back (successful
version probing). "
                         +
-                        "Downgrade subscription metadata to commonly supported version and
trigger new rebalance.",
+                        "Downgrade subscription metadata to commonly supported version {}
and trigger new rebalance.",
                     usedSubscriptionMetadataVersion,
-                    receivedAssignmentMetadataVersion
+                    receivedAssignmentMetadataVersion,
+                    latestCommonlySupportedVersion
                 );
                 usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                 return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 5c7b037..3521def 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -143,7 +143,7 @@ public class AssignmentInfo {
                     break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: " + usedVersion
-                        + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+                        + "; latest commonly supported version: " + commonlySupportedVersion);
             }
 
             out.flush();
@@ -206,14 +206,14 @@ public class AssignmentInfo {
 
     private void encodeVersionThree(final DataOutputStream out) throws IOException {
         out.writeInt(3);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
     }
 
     private void encodeVersionFour(final DataOutputStream out) throws IOException {
         out.writeInt(4);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
         out.writeInt(errCode);
@@ -230,7 +230,7 @@ public class AssignmentInfo {
             final AssignmentInfo assignmentInfo;
 
             final int usedVersion = in.readInt();
-            final int latestSupportedVersion;
+            final int commonlySupportedVersion;
             switch (usedVersion) {
                 case 1:
                     assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -241,13 +241,13 @@ public class AssignmentInfo {
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
                 case 3:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
                 case 4:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionFourData(assignmentInfo, in);
                     break;
                 default:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index b4ad19f..57db25b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -207,11 +207,10 @@ public class SubscriptionInfo {
 
     private ByteBuffer encodeVersionThree() {
         final byte[] endPointBytes = prepareUserEndPoint();
-
         final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+        buf.putInt(3);
+        buf.putInt(latestSupportedVersion);
 
-        buf.putInt(3); // used version
-        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
         encodeClientUUID(buf);
         encodeTasks(buf, prevTasks);
         encodeTasks(buf, standbyTasks);
@@ -226,7 +225,7 @@ public class SubscriptionInfo {
         final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
         buf.putInt(4); // used version
-        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        buf.putInt(latestSupportedVersion); // supported version
         encodeClientUUID(buf);
         encodeTasks(buf, prevTasks);
         encodeTasks(buf, standbyTasks);
@@ -273,7 +272,7 @@ public class SubscriptionInfo {
             default:
                 latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
-                log.info("Unable to decode subscription data: used version: {}; latest supported
version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
+                log.info("Unable to decode subscription data: used version: {}; latest supported
version: {}", usedVersion, latestSupportedVersion);
         }
 
         return subscriptionInfo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 8b99065..3ed2f71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -94,4 +94,13 @@ public class AssignmentInfoTest {
         final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION,
activeTasks, standbyTasks, globalAssignment, 2);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
+
+    @Test
+    public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
+        final int usedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+        final int commonlySupportedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+        final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion,
activeTasks, standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion,
activeTasks, standbyTasks, globalAssignment, 0);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 2a75c57..6492cc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -90,6 +90,16 @@ public class SubscriptionInfoTest {
         assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
     }
 
+    @Test
+    public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
+        final int usedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+        final int latestSupportedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+
+        final SubscriptionInfo info = new SubscriptionInfo(usedVersion, latestSupportedVersion,
processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion,
processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
     private ByteBuffer encodeFutureVersion() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
                                                    + 4 /* supported version */);


Mime
View raw message