kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.6 updated: MINOR: remove unnecessary timeout for admin request (#8738)
Date Fri, 29 May 2020 01:35:25 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 3229e90  MINOR: remove unnecessary timeout for admin request (#8738)
3229e90 is described below

commit 3229e909ef78143143efe7ad8470abd6dd3fe451
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu May 28 18:01:01 2020 -0700

    MINOR: remove unnecessary timeout for admin request (#8738)
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  5 ++--
 .../kafka/streams/kstream/CogroupedKStream.java    |  2 +-
 .../org/apache/kafka/streams/kstream/KStream.java  |  2 +-
 .../streams/processor/internals/ClientUtils.java   | 30 ++++++++++------------
 .../processor/internals/InternalTopicManager.java  | 13 +++-------
 .../streams/processor/internals/StreamThread.java  |  1 +
 .../internals/StreamsPartitionAssignor.java        |  5 +---
 .../assignment/AssignorConfiguration.java          |  5 ----
 .../integration/TaskAssignorIntegrationTest.java   |  9 +------
 .../processor/internals/ClientUtilsTest.java       | 22 ++++++++--------
 .../internals/StreamsPartitionAssignorTest.java    | 12 ---------
 11 files changed, 35 insertions(+), 71 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7a7e4df..263bf9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -92,7 +92,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
 
 /**
  * A Kafka client that allows for performing continuous computation on input coming from
one or more input topics and
@@ -1230,6 +1230,7 @@ public class KafkaStreams implements AutoCloseable {
      * of invocation to once every few seconds.
      *
      * @return map of store names to another map of partition to {@link LagInfo}s
+     * @throws StreamsException if the admin client request throws exception
      */
     public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
         final Map<String, Map<Integer, LagInfo>> localStorePartitionLags = new
TreeMap<>();
@@ -1246,7 +1247,7 @@ public class KafkaStreams implements AutoCloseable {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions,
adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions,
adminClient);
         log.debug("Current end offsets :{}", allEndOffsets);
 
         for (final Map.Entry<TopicPartition, ListOffsetsResultInfo> entry : allEndOffsets.entrySet())
{
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
index aba2f6b..59e2fe7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
@@ -45,7 +45,7 @@ public interface CogroupedKStream<K, VOut> {
      * streams of this {@code CogroupedKStream}.
      * If this is not the case, you would need to call {@link KStream#repartition(Repartitioned)}
before
      * {@link KStream#groupByKey() grouping} the {@link KStream} and specify the "correct"
number of
-     * partitions via {@link Repartitioned) parameter.
+     * partitions via {@link Repartitioned} parameter.
      * <p>
      * The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer)
aggregation} step for
      * each input record and computes a new aggregate using the current aggregate (or for
the very first record per key
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8f99c9d..bcc911c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -3324,7 +3324,7 @@ public interface KStream<K, V> {
      * }
      * }</pre>
      * Even if any upstream operation was key-changing, no auto-repartition is triggered.
-     * If repartitioning is required, a call to {@link #repartition() should be performed
before
+     * If repartitioning is required, a call to {@link #repartition()} should be performed
before
      * {@code transformValues()}.
      * <p>
      * Note that the key is read-only and should not be modified, as this can lead to corrupt
partitioning.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 33aee64..613565d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,25 +25,29 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 
-import java.time.Duration;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientUtils {
-
     private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
 
+    public static final class QuietAdminClientConfig extends AdminClientConfig {
+        QuietAdminClientConfig(final StreamsConfig streamsConfig) {
+            // If you just want to look up admin configs, you don't care about the clientId
+            super(streamsConfig.getAdminConfigs("dummy"), false);
+        }
+    }
+
     // currently admin client is shared among all threads
     public static String getSharedAdminClientId(final String clientId) {
         return clientId + "-admin";
@@ -90,25 +95,16 @@ public class ClientUtils {
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final
Collection<TopicPartition> partitions,
-                                                                                        
  final Admin adminClient) {
-        return fetchEndOffsets(partitions, adminClient, null);
-    }
-
     public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final
Collection<TopicPartition> partitions,
-                                                                             final Admin
adminClient,
-                                                                             final Duration
timeout) {
+                                                                             final Admin
adminClient) {
         final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
         try {
             final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future
=  adminClient.listOffsets(
                 partitions.stream().collect(Collectors.toMap(Function.identity(), tp ->
OffsetSpec.latest())))
                                                                                         .all();
-            if (timeout == null) {
-                endOffsets = future.get();
-            } else {
-                endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-            }
-        } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException
e) {
+            endOffsets = future.get();
+
+        } catch (final RuntimeException | InterruptedException | ExecutionException e) {
             LOG.warn("listOffsets request failed.", e);
             throw new StreamsException("Unable to obtain end offsets from kafka", e);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a05aa48..dbbdb22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.internals.ClientUtils.QuietAdminClientConfig;
 import org.slf4j.Logger;
 
 import java.util.HashMap;
@@ -44,12 +45,6 @@ public class InternalTopicManager {
     private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This
indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list
(https://kafka.apache.org/contact).";
 
-    private static final class InternalAdminClientConfig extends AdminClientConfig {
-        private InternalAdminClientConfig(final Map<?, ?> props) {
-            super(props, false);
-        }
-    }
-
     private final Logger log;
     private final long windowChangeLogAdditionalRetention;
     private final Map<String, String> defaultTopicConfigs = new HashMap<>();
@@ -68,9 +63,9 @@ public class InternalTopicManager {
 
         replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
-        final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
-        retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG);
-        retryBackOffMs = dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        final QuietAdminClientConfig adminConfigs = new QuietAdminClientConfig(streamsConfig);
+        retries = adminConfigs.getInt(AdminClientConfig.RETRIES_CONFIG);
+        retryBackOffMs = adminConfigs.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
 
         log.debug("Configs:" + Utils.NL +
             "\t{} = {}" + Utils.NL +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index fb73a66..6e7a3aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -551,6 +551,7 @@ public class StreamThread extends Thread {
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
                     mainConsumer.enforceRebalance();
+                    nextProbingRebalanceMs.set(Long.MAX_VALUE);
                 }
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs()
+ " are corrupted. " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 7e207d2..a6fbdfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -48,7 +48,6 @@ import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -168,7 +167,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
 
     private Admin adminClient;
-    private int adminClientTimeout;
     private InternalTopicManager internalTopicManager;
     private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
     private RebalanceProtocol rebalanceProtocol;
@@ -200,7 +198,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         partitionGrouper = assignorConfiguration.partitionGrouper();
         userEndPoint = assignorConfiguration.userEndPoint();
         adminClient = assignorConfiguration.adminClient();
-        adminClientTimeout = assignorConfiguration.adminClientTimeout();
         internalTopicManager = assignorConfiguration.internalTopicManager();
         copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
         rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
@@ -773,7 +770,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
 
             final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout));
+                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
 
             allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask,
allNewlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 84604ee..2d510a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigDef;
@@ -296,10 +295,6 @@ public final class AssignorConfiguration {
         return adminClient;
     }
 
-    public int adminClientTimeout() {
-        return streamsConfig.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
-    }
-
     public InternalTopicManager internalTopicManager() {
         return new InternalTopicManager(adminClient, streamsConfig);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 9767673..329e7bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.streams.KafkaStreams;
@@ -88,8 +87,7 @@ public class TaskAssignorIntegrationTest {
                 mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
-                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
-                mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9)
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener)
             )
         );
 
@@ -122,17 +120,12 @@ public class TaskAssignorIntegrationTest {
             final AssignorConfiguration.AssignmentListener actualAssignmentListener =
                 (AssignorConfiguration.AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
 
-            final Field adminClientTimeoutField = StreamsPartitionAssignor.class.getDeclaredField("adminClientTimeout");
-            adminClientTimeoutField.setAccessible(true);
-            final int adminClientTimeout =
-                (int) adminClientTimeoutField.get(streamsPartitionAssignor);
 
             assertThat(configs.numStandbyReplicas, is(5));
             assertThat(configs.acceptableRecoveryLag, is(6L));
             assertThat(configs.maxWarmupReplicas, is(7));
             assertThat(configs.probingRebalanceIntervalMs, is(480000L));
             assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener));
-            assertThat(adminClientTimeout, is(9));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index 96b6b62..6d9d8df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -18,22 +18,19 @@ package org.apache.kafka.streams.processor.internals;
 
 import static java.util.Collections.emptyList;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsWithoutTimeout;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertThrows;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.easymock.EasyMock;
 import org.junit.Test;
@@ -45,12 +42,12 @@ public class ClientUtilsTest {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException());
         replay(adminClient);
-        assertThrows(StreamsException.class, () ->  fetchEndOffsetsWithoutTimeout(emptyList(),
adminClient));
+        assertThrows(StreamsException.class, () ->  fetchEndOffsets(emptyList(), adminClient));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws
InterruptedException, ExecutionException {
+    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws
Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
@@ -60,12 +57,12 @@ public class ClientUtilsTest {
         EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(),
adminClient));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws
InterruptedException, ExecutionException {
+    public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws
Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
@@ -75,22 +72,23 @@ public class ClientUtilsTest {
         EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(),
adminClient));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException()
throws InterruptedException, ExecutionException, TimeoutException {
+    public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException()
throws Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
 
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
         EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get(1L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException());
+        EasyMock.expect(allFuture.get()).andThrow(new TimeoutException());
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient,
Duration.ofMillis(1)));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
         verify(adminClient);
     }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 82edeb0..350a598 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
@@ -1791,17 +1790,6 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldSetAdminClientTimeout() {
-        createDefaultMockTaskManager();
-
-        final Map<String, Object> props = configProps();
-        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 2 * 60 * 1000);
-        final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
-
-        assertThat(assignorConfiguration.adminClientTimeout(), is(2 * 60 * 1000));
-    }
-
-    @Test
     public void shouldGetNextProbingRebalanceMs() {
         nextScheduledRebalanceMs.set(5 * 60 * 1000L);
 


Mime
View raw message