kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-9568: enforce rebalance if client endpoint has changed (#8299)
Date Thu, 19 Mar 2020 01:38:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85c96f5  KAFKA-9568: enforce rebalance if client endpoint has changed (#8299)
85c96f5 is described below

commit 85c96f523090cce5fd8767c0fd3505ee4885f63d
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Mar 18 18:37:38 2020 -0700

    KAFKA-9568: enforce rebalance if client endpoint has changed (#8299)
    
    Since the assignment info includes a map with all member's host info, we can just check
the received map to make sure our endpoint is contained. If not, we need to force the group
to rebalance and get our updated endpoint info.
    
    Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 16 ++-----
 .../org/apache/kafka/streams/KeyQueryMetadata.java |  5 +--
 .../streams/processor/internals/StreamThread.java  |  4 +-
 .../processor/internals/StreamsMetadataState.java  |  4 +-
 .../internals/StreamsPartitionAssignor.java        | 50 ++++++++++++---------
 .../internals/assignment/AssignorError.java        |  2 +-
 .../org/apache/kafka/streams/state/HostInfo.java   | 31 +++++++++++++
 .../kafka/streams/state/StreamsMetadata.java       |  2 +-
 .../internals/StreamsPartitionAssignorTest.java    | 52 +++++++++++++++++++---
 .../apache/kafka/streams/state/HostInfoTest.java   | 52 ++++++++++++++++++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  2 +-
 11 files changed, 172 insertions(+), 48 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2a901ed..2807e62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -90,8 +89,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.common.utils.Utils.getHost;
-import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
@@ -808,17 +805,12 @@ public class KafkaStreams implements AutoCloseable {
     }
 
     private static HostInfo parseHostInfo(final String endPoint) {
-        if (endPoint == null || endPoint.trim().isEmpty()) {
+        final HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint);
+        if (hostInfo == null) {
             return StreamsMetadataState.UNKNOWN_HOST;
+        } else {
+            return hostInfo;
         }
-        final String host = getHost(endPoint);
-        final Integer port = getPort(endPoint);
-
-        if (host == null || port == null) {
-            throw new ConfigException(String.format("Error parsing host address %s. Expected
format host:port.", endPoint));
-        }
-
-        return new HostInfo(host, port);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 9165282..0a3c9b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -34,9 +34,8 @@ public class KeyQueryMetadata {
      * Sentinel to indicate that the KeyQueryMetadata is currently unavailable. This can
occur during rebalance
      * operations.
      */
-    public static final KeyQueryMetadata NOT_AVAILABLE = new KeyQueryMetadata(new HostInfo("unavailable",
-1),
-            Collections.emptySet(),
-            -1);
+    public static final KeyQueryMetadata NOT_AVAILABLE =
+        new KeyQueryMetadata(HostInfo.unavailable(), Collections.emptySet(), -1);
 
     private final HostInfo activeHost;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb0179c..adb6f04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -496,9 +496,7 @@ public class StreamThread extends Thread {
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
                 runOnce();
-                if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
-                    log.info("Version probing detected. Rejoining the consumer group to trigger
a new rebalance.");
-
+                if (assignmentErrorCode.get() == AssignorError.REBALANCE_NEEDED.code()) {
                     assignmentErrorCode.set(AssignorError.NONE.code());
                     mainConsumer.enforceRebalance();
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index a84f9e7..b554fc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -44,7 +44,7 @@ import java.util.Set;
  * in a KafkaStreams application
  */
 public class StreamsMetadataState {
-    public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
+    public static final HostInfo UNKNOWN_HOST = HostInfo.unavailable();
     private final InternalTopologyBuilder builder;
     private final Set<String> globalStores;
     private final HostInfo thisHost;
@@ -366,7 +366,7 @@ public class StreamsMetadataState {
             matchingPartitions.add(new TopicPartition(sourceTopic, partition));
         }
 
-        HostInfo activeHost = new HostInfo("unavailable", -1);
+        HostInfo activeHost = UNKNOWN_HOST;
         final Set<HostInfo> standbyHosts = new HashSet<>();
         for (final StreamsMetadata streamsMetadata : allMetadata) {
             final Set<String> activeStateStoreNames = streamsMetadata.stateStoreNames();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 79764a2..3637431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -59,8 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static java.util.UUID.randomUUID;
-import static org.apache.kafka.common.utils.Utils.getHost;
-import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
@@ -109,21 +106,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
 
         ClientMetadata(final String endPoint) {
 
-            // get the host info if possible
-            if (endPoint != null) {
-                final String host = getHost(endPoint);
-                final Integer port = getPort(endPoint);
-
-                if (host == null || port == null) {
-                    throw new ConfigException(
-                        String.format("Error parsing host address %s. Expected format host:port.",
endPoint)
-                    );
-                }
-
-                hostInfo = new HostInfo(host, port);
-            } else {
-                hostInfo = null;
-            }
+            // get the host info, or null if no endpoint is configured (ie endPoint == null)
+            hostInfo = HostInfo.buildFromEndpoint(endPoint);
 
             // initialize the consumer memberIds
             consumers = new HashSet<>();
@@ -1234,7 +1218,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
 
         // Check if this was a version probing rebalance and check the error code to trigger
another rebalance if so
         if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion))
{
-            setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
+            log.info("Version probing detected. Rejoining the consumer group to trigger a
new rebalance.");
+            setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
         }
 
         // version 1 field
@@ -1280,6 +1265,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 );
         }
 
+        verifyHostInfo(partitionsByHost.keySet());
+
         final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
         streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
 
@@ -1288,6 +1275,25 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         taskManager.handleAssignment(activeTasks, info.standbyTasks());
     }
 
+    /**
+     * Verify that this client's host info was included in the map returned in the assignment,
and trigger a
+     * rebalance if not. This may be necessary when using static membership, as a rejoining
client will be handed
+     * back its original assignment to avoid an unnecessary rebalance. If the client's endpoint
has changed, we need
+     * to force a rebalance for the other members in the group to get the updated host info
for this client.
+     *
+     * @param groupHostInfo the HostInfo of all clients in the group
+     */
+    private void verifyHostInfo(final Set<HostInfo> groupHostInfo) {
+        if (userEndPoint != null && !groupHostInfo.isEmpty()) {
+            final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);
+
+            if (!groupHostInfo.contains(myHostInfo)) {
+                log.info("Triggering a rebalance to update group with new endpoint = {}",
userEndPoint);
+                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+            }
+        }
+    }
+
     // protected for upgrade test
     protected static Map<TaskId, Set<TopicPartition>> getActiveTasks(final List<TopicPartition>
partitions, final AssignmentInfo info) {
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1299,7 +1305,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         return activeTasks;
     }
 
-    private static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo,
Set<TopicPartition>> partitionsByHost) {
+    static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo,
Set<TopicPartition>> partitionsByHost) {
         final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
         for (final Set<TopicPartition> value : partitionsByHost.values()) {
             for (final TopicPartition topicPartition : value) {
@@ -1392,6 +1398,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         assignmentErrorCode.set(errorCode);
     }
 
+    Integer assignmentErrorCode() {
+        return assignmentErrorCode.get();
+    }
+
     // following functions are for test only
     void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 3baf4f5..259c3db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
 public enum AssignorError {
     NONE(0),
     INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-    VERSION_PROBING(2);
+    REBALANCE_NEEDED(2);
 
     private final int code;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 58cdba6..6293cf5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.state;
 
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -46,6 +50,33 @@ public class HostInfo {
         this.port = port;
     }
 
+    /**
+     * @throws ConfigException if the host or port cannot be parsed from the given endpoint
string
+     * @return a new HostInfo or null if endPoint is null or has no characters
+     */
+    public static HostInfo buildFromEndpoint(final String endPoint) {
+        if (endPoint == null || endPoint.trim().isEmpty()) {
+            return null;
+        }
+
+        final String host = getHost(endPoint);
+        final Integer port = getPort(endPoint);
+
+        if (host == null || port == null) {
+            throw new ConfigException(
+                String.format("Error parsing host address %s. Expected format host:port.",
endPoint)
+            );
+        }
+        return new HostInfo(host, port);
+    }
+
+    /**
+     * @return a sentinel for cases where the host metadata is currently unavailable, eg
during rebalance operations.
+     */
+    public static HostInfo unavailable() {
+        return new HostInfo("unavailable", -1);
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index b1d3f4a..1715cf4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -35,7 +35,7 @@ public class StreamsMetadata {
      * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur
during rebalance
      * operations.
      */
-    public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(new HostInfo("unavailable",
-1),
+    public final static StreamsMetadata NOT_AVAILABLE = new StreamsMetadata(HostInfo.unavailable(),
                                                                             Collections.emptySet(),
                                                                             Collections.emptySet(),
                                                                             Collections.emptySet(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 02062f6..de65281 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
@@ -68,12 +69,14 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1281,15 +1284,54 @@ public class StreamsPartitionAssignorTest {
 
     @Test
     public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
-        final TopicPartition partitionOne = new TopicPartition("topic", 1);
-        final TopicPartition partitionTwo = new TopicPartition("topic", 2);
-        final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
-            new HostInfo("localhost", 9090), mkSet(partitionOne, partitionTwo));
+        final Map<HostInfo, Set<TopicPartition>> initialHostState = mkMap(
+            mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
+            mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
+            );
+
+        final Map<HostInfo, Set<TopicPartition>> newHostState = mkMap(
+            mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
+            mkEntry(new HostInfo("newotherhost", 9090), mkSet(t2p0, t2p1))
+        );
+
+        streamsMetadataState = EasyMock.createStrictMock(StreamsMetadataState.class);
+
+        streamsMetadataState.onChange(EasyMock.eq(initialHostState), EasyMock.anyObject(),
EasyMock.anyObject());
+        streamsMetadataState.onChange(EasyMock.eq(newHostState), EasyMock.anyObject(), EasyMock.anyObject());
+        EasyMock.replay(streamsMetadataState);
 
         createDefaultMockTaskManager();
         configureDefaultPartitionAssignor();
 
-        partitionAssignor.onAssignment(createAssignment(hostState), null);
+        partitionAssignor.onAssignment(createAssignment(initialHostState), null);
+        partitionAssignor.onAssignment(createAssignment(newHostState), null);
+
+        EasyMock.verify(taskManager, streamsMetadataState);
+    }
+
+    @Test
+    public void shouldTriggerRebalanceOnHostInfoChange() {
+        final Map<HostInfo, Set<TopicPartition>> oldHostState = mkMap(
+            mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
+            mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
+        );
+
+        final Map<HostInfo, Set<TopicPartition>> newHostState = mkMap(
+            mkEntry(new HostInfo("newhost", 9090), mkSet(t1p0, t1p1)),
+            mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
+        );
+
+        createDefaultMockTaskManager();
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
"newhost:9090"));
+
+        partitionAssignor.onAssignment(createAssignment(oldHostState), null);
+
+        assertThat(partitionAssignor.assignmentErrorCode(), is(AssignorError.REBALANCE_NEEDED.code()));
+
+        partitionAssignor.setAssignmentErrorCode(AssignorError.NONE.code());
+        partitionAssignor.onAssignment(createAssignment(newHostState), null);
+
+        assertThat(partitionAssignor.assignmentErrorCode(), is(AssignorError.NONE.code()));
 
         EasyMock.verify(taskManager);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/HostInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/state/HostInfoTest.java
new file mode 100644
index 0000000..0bee8e6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/HostInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+public class HostInfoTest {
+    
+    @Test
+    public void shouldCreateHostInfo() {
+        final String endPoint = "host:9090";
+        final HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint);
+
+        assertThat(hostInfo.host(), is("host"));
+        assertThat(hostInfo.port(), is(9090));
+    }
+
+    @Test
+    public void shouldReturnNullHostInfoForNullEndPoint() {
+        assertNull(HostInfo.buildFromEndpoint(null));
+    }
+
+    @Test
+    public void shouldReturnNullHostInfoForEmptyEndPoint() {
+        assertNull(HostInfo.buildFromEndpoint("  "));
+    }
+
+    @Test
+    public void shouldThrowConfigExceptionForNonsenseEndPoint() {
+        assertThrows(ConfigException.class, () -> HostInfo.buildFromEndpoint("nonsense"));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 8862008..78903c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -194,7 +194,7 @@ public class StreamsUpgradeTest {
                 assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION));
 
             if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion()))
{
-                setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
+                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
                 usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion);
             }
 


Mime
View raw message