kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10167: use the admin client to read end-offset (#8876)
Date Thu, 18 Jun 2020 18:29:46 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d8cc6fe  KAFKA-10167: use the admin client to read end-offset (#8876)
d8cc6fe is described below

commit d8cc6fe8e36329c647736773d9d66de89c447409
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jun 18 11:28:49 2020 -0700

    KAFKA-10167: use the admin client to read end-offset (#8876)
    
    Since admin client allows use to use flexible offset-spec, we can always set to use read-uncommitted regardless of the EOS config.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/admin/ListOffsetsResult.java     |   4 +-
 .../org/apache/kafka/clients/admin/OffsetSpec.java |   6 +-
 .../kafka/clients/admin/MockAdminClient.java       |  38 +++++-
 .../processor/internals/ChangelogRegister.java     |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  21 +++-
 .../streams/processor/internals/StreamThread.java  |  32 +++--
 .../internals/StoreChangelogReaderTest.java        | 139 ++++++++-------------
 .../processor/internals/StreamThreadTest.java      | 113 ++++++++---------
 .../StreamThreadStateStoreProviderTest.java        |  11 +-
 .../org/apache/kafka/test/MockClientSupplier.java  |   6 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  53 +++-----
 11 files changed, 222 insertions(+), 203 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
index d830ef2..5eb00de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListOffsetsResult.java
@@ -35,7 +35,7 @@ public class ListOffsetsResult {
 
     private final Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures;
 
-    ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {
+    public ListOffsetsResult(Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures) {
         this.futures = futures;
     }
 
@@ -80,7 +80,7 @@ public class ListOffsetsResult {
         private final long timestamp;
         private final Optional<Integer> leaderEpoch;
 
-        ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {
+        public ListOffsetsResultInfo(long offset, long timestamp, Optional<Integer> leaderEpoch) {
             this.offset = offset;
             this.timestamp = timestamp;
             this.leaderEpoch = leaderEpoch;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 2517985..339e9cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -23,9 +23,9 @@ import java.util.Map;
  */
 public class OffsetSpec {
 
-    static class EarliestSpec extends OffsetSpec { }
-    static class LatestSpec extends OffsetSpec { }
-    static class TimestampSpec extends OffsetSpec {
+    public static class EarliestSpec extends OffsetSpec { }
+    public static class LatestSpec extends OffsetSpec { }
+    public static class TimestampSpec extends OffsetSpec {
         private final long timestamp;
 
         TimestampSpec(long timestamp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index e5ad575..bf0a9e9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -64,6 +64,8 @@ public class MockAdminClient extends AdminClient {
         new HashMap<>();
     private final Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaMoves =
         new HashMap<>();
+    private final Map<TopicPartition, Long> beginningOffsets;
+    private final Map<TopicPartition, Long> endOffsets;
     private final String clusterId;
     private final List<List<String>> brokerLogDirs;
     private final List<Map<String, String>> brokerConfigs;
@@ -145,8 +147,11 @@ public class MockAdminClient extends AdminClient {
         }
     }
 
-    public MockAdminClient(List<Node> brokers,
-                           Node controller) {
+    public MockAdminClient() {
+        this(Collections.singletonList(Node.noNode()), Node.noNode());
+    }
+
+    public MockAdminClient(List<Node> brokers, Node controller) {
         this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
             Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS));
     }
@@ -167,6 +172,8 @@ public class MockAdminClient extends AdminClient {
         for (int i = 0; i < brokers.size(); i++) {
             this.brokerConfigs.add(new HashMap<>());
         }
+        this.beginningOffsets = new HashMap<>();
+        this.endOffsets = new HashMap<>();
     }
 
     synchronized public void controller(Node controller) {
@@ -789,7 +796,24 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     synchronized public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, OffsetSpec> entry : topicPartitionOffsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            OffsetSpec spec = entry.getValue();
+            KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo> future = new KafkaFutureImpl<>();
+
+            if (spec instanceof OffsetSpec.TimestampSpec)
+                throw new UnsupportedOperationException("Not implement yet");
+            else if (spec instanceof OffsetSpec.EarliestSpec)
+                future.complete(new ListOffsetsResult.ListOffsetsResultInfo(beginningOffsets.get(tp), -1, Optional.empty()));
+            else
+                future.complete(new ListOffsetsResult.ListOffsetsResultInfo(endOffsets.get(tp), -1, Optional.empty()));
+
+            futures.put(tp, future);
+        }
+
+        return new ListOffsetsResult(futures);
     }
 
     @Override
@@ -805,6 +829,14 @@ public class MockAdminClient extends AdminClient {
     @Override
     synchronized public void close(Duration timeout) {}
 
+    public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
+        beginningOffsets.putAll(newOffsets);
+    }
+
+    public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
+        endOffsets.putAll(newOffsets);
+    }
+
     private final static class TopicMetadata {
         final boolean isInternalTopic;
         final List<TopicPartitionInfo> partitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
index 7403aad..3f5f977 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 /**
  * See {@link StoreChangelogReader}.
  */
-interface ChangelogRegister {
+public interface ChangelogRegister {
     /**
      * Register a state store for restoration.
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index ad5cce0..c6ee4d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -43,6 +48,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
@@ -199,6 +206,9 @@ public class StoreChangelogReader implements ChangelogReader {
     // to update offset limit for standby tasks;
     private Consumer<byte[], byte[]> mainConsumer;
 
+    // the changelog reader needs the admin client to list end offsets
+    private final Admin adminClient;
+
     private long lastUpdateOffsetTime;
 
     void setMainConsumer(final Consumer<byte[], byte[]> consumer) {
@@ -208,11 +218,13 @@ public class StoreChangelogReader implements ChangelogReader {
     public StoreChangelogReader(final Time time,
                                 final StreamsConfig config,
                                 final LogContext logContext,
+                                final Admin adminClient,
                                 final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener stateRestoreListener) {
         this.time = time;
         this.log = logContext.logger(StoreChangelogReader.class);
         this.state = ChangelogReaderState.ACTIVE_RESTORING;
+        this.adminClient = adminClient;
         this.restoreConsumer = restoreConsumer;
         this.stateRestoreListener = stateRestoreListener;
 
@@ -564,8 +576,13 @@ public class StoreChangelogReader implements ChangelogReader {
             return Collections.emptyMap();
 
         try {
-            return restoreConsumer.endOffsets(partitions);
-        } catch (final TimeoutException e) {
+            final ListOffsetsResult result = adminClient.listOffsets(
+                    partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())),
+                    new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED)
+            );
+            return result.all().get().entrySet().stream().collect(
+                    Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset()));
+        } catch (final TimeoutException | InterruptedException | ExecutionException e) {
             // if timeout exception gets thrown we just give up this time and retry in the next run loop
             log.debug("Could not fetch all end offsets for {}, will retry in the next run loop", partitions);
             return Collections.emptyMap();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 91c483c..09be1f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -273,14 +273,12 @@ public class StreamThread extends Thread {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
 
-    private final Admin adminClient;
     private final ChangelogReader changelogReader;
-
-    // package-private for testing
-    final ConsumerRebalanceListener rebalanceListener;
-    final Consumer<byte[], byte[]> mainConsumer;
-    final Consumer<byte[], byte[]> restoreConsumer;
-    final InternalTopologyBuilder builder;
+    private final ConsumerRebalanceListener rebalanceListener;
+    private final Consumer<byte[], byte[]> mainConsumer;
+    private final Consumer<byte[], byte[]> restoreConsumer;
+    private final Admin adminClient;
+    private final InternalTopologyBuilder builder;
 
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
@@ -309,6 +307,7 @@ public class StreamThread extends Thread {
             time,
             config,
             logContext,
+            adminClient,
             restoreConsumer,
             userStateRestoreListener
         );
@@ -1020,4 +1019,23 @@ public class StreamThread extends Thread {
         return numIterations;
     }
 
+    ConsumerRebalanceListener rebalanceListener() {
+        return rebalanceListener;
+    }
+
+    Consumer<byte[], byte[]> mainConsumer() {
+        return mainConsumer;
+    }
+
+    Consumer<byte[], byte[]> restoreConsumer() {
+        return restoreConsumer;
+    };
+
+    Admin adminClient() {
+        return adminClient;
+    }
+
+    InternalTopologyBuilder internalTopologyBuilder() {
+        return builder;
+    };
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 6769f4a..ad16cff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -46,7 +50,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.time.Duration;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -129,8 +132,9 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     };
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final MockAdminClient adminClient = new MockAdminClient();
     private final StoreChangelogReader changelogReader =
-        new StoreChangelogReader(time, config, logContext, consumer, callback);
+        new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
     @Before
     public void setUp() {
@@ -185,15 +189,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
         EasyMock.replay(stateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
-        };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, stateManager);
         changelogReader.restore();
@@ -226,16 +225,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
         EasyMock.replay(stateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
-            }
-        };
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
 
         final StoreChangelogReader changelogReader =
-                new StoreChangelogReader(time, config, logContext, consumer, callback);
+                new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, stateManager);
 
@@ -257,15 +251,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
         EasyMock.replay(stateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
-        };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, stateManager);
 
@@ -328,16 +317,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
         EasyMock.replay(stateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
-            }
-        };
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, stateManager);
 
@@ -404,15 +388,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 0L));
-            }
-        };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, activeStateManager);
         changelogReader.restore();
@@ -444,15 +423,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
                     throw new TimeoutException("KABOOM!");
                 }
             }
-
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
         };
 
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, activeStateManager);
         changelogReader.restore();
@@ -480,15 +456,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
             public long position(final TopicPartition partition) {
                 throw kaboom;
             }
-
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
         };
 
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, activeStateManager);
 
@@ -502,17 +475,22 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+
+        final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+            public ListOffsetsResult listOffsets(final Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+                                                 final ListOffsetsOptions options) {
                 if (functionCalled.get()) {
-                    return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
+                    return super.listOffsets(topicPartitionOffsets, options);
                 } else {
                     functionCalled.set(true);
                     throw new TimeoutException("KABOOM!");
                 }
             }
+        };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
 
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
                 throw new AssertionError("Should not trigger this function");
@@ -520,7 +498,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         };
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, activeStateManager);
         changelogReader.restore();
@@ -541,15 +519,17 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+        final MockAdminClient adminClient = new MockAdminClient() {
             @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+            public ListOffsetsResult listOffsets(final Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
+                                                 final ListOffsetsOptions options) {
                 throw kaboom;
             }
         };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 0L));
 
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         changelogReader.register(tp, activeStateManager);
 
@@ -576,15 +556,12 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
                     throw new TimeoutException("KABOOM!");
                 }
             }
-
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 20L));
-            }
         };
 
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 20L));
+
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
         changelogReader.setMainConsumer(consumer);
 
         changelogReader.register(tp, stateManager);
@@ -618,17 +595,15 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
-
-            @Override
             public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
                 throw kaboom;
             }
         };
+
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
         changelogReader.setMainConsumer(consumer);
 
         changelogReader.register(tp, stateManager);
@@ -648,7 +623,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
             }
         };
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, callback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
         final StreamsException thrown = assertThrows(StreamsException.class, changelogReader::clear);
         assertEquals(kaboom, thrown.getCause());
@@ -705,7 +680,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
         changelogReader.setMainConsumer(consumer);
         changelogReader.transitToUpdateStandby();
 
@@ -761,7 +736,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
         changelogReader.setMainConsumer(consumer);
         changelogReader.transitToUpdateStandby();
 
@@ -916,13 +891,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(standbyStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
         EasyMock.replay(activeStateManager, standbyStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
-        };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, consumer, callback);
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp1, 10L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp2, 10L));
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
         assertEquals(ACTIVE_RESTORING, changelogReader.state());
 
         changelogReader.register(tp, activeStateManager);
@@ -983,14 +955,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
         EasyMock.replay(activeStateManager, storeMetadata, store);
 
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 10L));
-            }
-        };
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
         final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, consumer, exceptionCallback);
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, exceptionCallback);
 
         changelogReader.register(tp, activeStateManager);
 
@@ -1052,6 +1020,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
                 null)));
         consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1));
+        adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages) + 1));
         consumer.assign(Collections.singletonList(topicPartition));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6b15cfc..c17dd69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -252,7 +252,7 @@ public class StreamThreadTest {
         thread.setStateListener(stateListener);
         assertEquals(thread.state(), StreamThread.State.CREATED);
 
-        final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+        final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener();
 
         final List<TopicPartition> revokedPartitions;
         final List<TopicPartition> assignedPartitions;
@@ -267,7 +267,7 @@ public class StreamThreadTest {
         // assign single partition
         assignedPartitions = Collections.singletonList(t1p1);
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
@@ -582,10 +582,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(Collections.singletonMap(task1, assignedPartitions), emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
         thread.runOnce();
 
         // processed one record, punctuated after the first record, and hence num.iterations is still 1
@@ -751,7 +751,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -764,21 +764,21 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         final Map<TopicPartition, Long> beginOffsets = new HashMap<>();
         beginOffsets.put(t1p1, 0L);
         beginOffsets.put(t1p2, 0L);
         mockConsumer.updateBeginningOffsets(beginOffsets);
-        thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
+        thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions));
 
         assertEquals(1, clientSupplier.producers.size());
         final Producer<byte[], byte[]> globalProducer = clientSupplier.producers.get(0);
         for (final Task task : thread.activeTasks()) {
             assertSame(globalProducer, ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer());
         }
-        assertSame(clientSupplier.consumer, thread.mainConsumer);
-        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
+        assertSame(clientSupplier.consumer, thread.mainConsumer());
+        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer());
     }
 
     @Test
@@ -788,7 +788,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -801,19 +801,19 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         final Map<TopicPartition, Long> beginOffsets = new HashMap<>();
         beginOffsets.put(t1p1, 0L);
         beginOffsets.put(t1p2, 0L);
         mockConsumer.updateBeginningOffsets(beginOffsets);
-        thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
+        thread.rebalanceListener().onPartitionsAssigned(new HashSet<>(assignedPartitions));
 
         thread.runOnce();
 
         assertEquals(thread.activeTasks().size(), clientSupplier.producers.size());
-        assertSame(clientSupplier.consumer, thread.mainConsumer);
-        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
+        assertSame(clientSupplier.consumer, thread.mainConsumer());
+        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer());
     }
 
     @Test
@@ -828,7 +828,7 @@ public class StreamThreadTest {
             10 * 1000,
             "Thread never started.");
 
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
         thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -852,7 +852,7 @@ public class StreamThreadTest {
         assertEquals(Utils.mkSet(task1, task2), thread.taskManager().activeTaskIds());
         assertEquals(StreamThread.State.PENDING_SHUTDOWN, thread.state());
 
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         TestUtils.waitForCondition(
             () -> thread.state() == StreamThread.State.DEAD,
@@ -873,7 +873,7 @@ public class StreamThreadTest {
             10 * 1000,
             "Thread never started.");
 
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -885,7 +885,7 @@ public class StreamThreadTest {
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.shutdown();
         TestUtils.waitForCondition(
@@ -1064,7 +1064,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -1073,7 +1073,7 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
     }
 
     @Test
@@ -1088,7 +1088,7 @@ public class StreamThreadTest {
         consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -1099,10 +1099,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
         assertThat(thread.activeTasks().size(), equalTo(1));
@@ -1146,7 +1146,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -1157,10 +1157,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
 
@@ -1171,7 +1171,7 @@ public class StreamThreadTest {
         thread.runOnce();
 
         clientSupplier.producers.get(0).commitTransactionException = new ProducerFencedException("Producer is fenced");
-        assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener.onPartitionsRevoked(assignedPartitions));
+        assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions));
         assertFalse(clientSupplier.producers.get(0).transactionCommitted());
         assertFalse(clientSupplier.producers.get(0).closed());
         assertEquals(1, thread.activeTasks().size());
@@ -1189,7 +1189,7 @@ public class StreamThreadTest {
         consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -1200,10 +1200,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
         assertThat(thread.activeTasks().size(), equalTo(1));
@@ -1237,7 +1237,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -1248,10 +1248,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
 
@@ -1261,7 +1261,7 @@ public class StreamThreadTest {
         addRecord(mockConsumer, 0L);
         thread.runOnce();
 
-        thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
+        thread.rebalanceListener().onPartitionsRevoked(assignedPartitions);
         assertTrue(clientSupplier.producers.get(0).transactionCommitted());
         assertFalse(clientSupplier.producers.get(0).closed());
         assertEquals(1, thread.activeTasks().size());
@@ -1298,7 +1298,7 @@ public class StreamThreadTest {
         );
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -1309,10 +1309,10 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(assignedPartitions);
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
 
@@ -1356,7 +1356,7 @@ public class StreamThreadTest {
         restoreConsumer.updateBeginningOffsets(offsets);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -1365,7 +1365,7 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
 
         thread.runOnce();
 
@@ -1408,7 +1408,7 @@ public class StreamThreadTest {
         checkpoint.write(Collections.singletonMap(partition2, 5L));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
 
@@ -1419,7 +1419,7 @@ public class StreamThreadTest {
         thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
         thread.taskManager().tryToCompleteRestoration();
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
+        thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
 
         thread.runOnce();
 
@@ -1505,7 +1505,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1518,7 +1518,7 @@ public class StreamThreadTest {
 
         clientSupplier.consumer.assign(assignedPartitions);
         clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce();
 
@@ -1600,7 +1600,7 @@ public class StreamThreadTest {
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
+        thread.rebalanceListener().onPartitionsRevoked(assignedPartitions);
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1613,7 +1613,7 @@ public class StreamThreadTest {
 
         thread.taskManager().handleAssignment(activeTasks, standbyTasks);
 
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
 
         assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
     }
@@ -1632,8 +1632,9 @@ public class StreamThreadTest {
         internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread("clientId", config, false);
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
-        final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
+        final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer();
+        final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient();
 
         final TopicPartition topicPartition = new TopicPartition("topic", 0);
         final Set<TopicPartition> topicPartitionSet = Collections.singleton(topicPartition);
@@ -1674,11 +1675,11 @@ public class StreamThreadTest {
         final TopicPartition changelogPartition = new TopicPartition("stream-thread-test-count-changelog", 0);
         final Set<TopicPartition> changelogPartitionSet = Collections.singleton(changelogPartition);
         mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L));
-        mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L));
+        mockAdminClient.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L));
 
         mockConsumer.schedulePollTask(() -> {
             thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-            thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
+            thread.rebalanceListener().onPartitionsAssigned(topicPartitionSet);
         });
 
         try {
@@ -1686,7 +1687,7 @@ public class StreamThreadTest {
 
             TestUtils.waitForCondition(
                 () -> mockRestoreConsumer.assignment().size() == 1,
-                "Never restore first record");
+                "Never get the assignment");
 
             mockRestoreConsumer.addRecord(new ConsumerRecord<>(
                 "stream-thread-test-count-changelog",
@@ -1771,10 +1772,10 @@ public class StreamThreadTest {
             Collections.singletonMap(task1, assignedPartitions),
             emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
         thread.runOnce();
 
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
@@ -1863,7 +1864,7 @@ public class StreamThreadTest {
 
         consumer.schedulePollTask(() -> {
             thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-            thread.rebalanceListener.onPartitionsLost(assignedPartitions);
+            thread.rebalanceListener().onPartitionsLost(assignedPartitions);
         });
 
         thread.setState(StreamThread.State.STARTING);
@@ -1906,7 +1907,7 @@ public class StreamThreadTest {
 
         consumer.schedulePollTask(() -> {
             thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-            thread.rebalanceListener.onPartitionsRevoked(assignedPartitions);
+            thread.rebalanceListener().onPartitionsRevoked(assignedPartitions);
         });
 
         thread.setState(StreamThread.State.STARTING);
@@ -2108,10 +2109,10 @@ public class StreamThreadTest {
                 assignedPartitions),
             emptyMap());
 
-        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
         thread.runOnce();
 
         final MetricName skippedTotalMetric = metrics.metricName(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a171a46..9dac534 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -135,8 +135,8 @@ public class StreamThreadStateStoreProviderTest {
 
         final StreamsConfig streamsConfig = new StreamsConfig(properties);
         final MockClientSupplier clientSupplier = new MockClientSupplier();
-        configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
-        configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
+        configureClients(clientSupplier, "applicationId-kv-store-changelog");
+        configureClients(clientSupplier, "applicationId-window-store-changelog");
 
         final InternalTopologyBuilder internalTopologyBuilder = topology.getInternalBuilder(applicationId);
         final ProcessorTopology processorTopology = internalTopologyBuilder.buildTopology();
@@ -364,6 +364,7 @@ public class StreamThreadStateStoreProviderTest {
                 new MockTime(),
                 streamsConfig,
                 logContext,
+                clientSupplier.adminClient,
                 clientSupplier.restoreConsumer,
                 new MockStateRestoreListener()),
             topology.storeToChangelogTopic(), partitions);
@@ -414,8 +415,7 @@ public class StreamThreadStateStoreProviderTest {
         EasyMock.replay(threadMock);
     }
 
-    private void configureRestoreConsumer(final MockClientSupplier clientSupplier,
-                                          final String topic) {
+    private void configureClients(final MockClientSupplier clientSupplier, final String topic) {
         final List<PartitionInfo> partitions = Arrays.asList(
             new PartitionInfo(topic, 0, null, null, null),
             new PartitionInfo(topic, 1, null, null, null)
@@ -432,5 +432,8 @@ public class StreamThreadStateStoreProviderTest {
 
         clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
         clientSupplier.restoreConsumer.updateEndOffsets(offsets);
+
+        clientSupplier.adminClient.updateBeginningOffsets(offsets);
+        clientSupplier.adminClient.updateEndOffsets(offsets);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 641f6e5..880f2cb 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -41,11 +41,10 @@ public class MockClientSupplier implements KafkaClientSupplier {
     private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
 
     private Cluster cluster;
-
     private String applicationId;
 
+    public MockAdminClient adminClient = new MockAdminClient();
     public final List<MockProducer<byte[], byte[]>> producers = new LinkedList<>();
-
     public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
 
@@ -55,11 +54,12 @@ public class MockClientSupplier implements KafkaClientSupplier {
 
     public void setCluster(final Cluster cluster) {
         this.cluster = cluster;
+        this.adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(-1));
     }
 
     @Override
     public Admin getAdmin(final Map<String, Object> config) {
-        return new MockAdminClient(cluster.nodes(), cluster.nodeById(-1));
+        return adminClient;
     }
 
     @Override
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2ca187b..7615612 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ChangelogRegister;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateManager;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
@@ -64,7 +65,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
-import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
@@ -90,7 +90,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -474,12 +473,7 @@ public class TopologyTestDriver implements Closeable {
                 StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
                 logContext,
                 stateDirectory,
-                new StoreChangelogReader(
-                    mockWallClockTime,
-                    streamsConfig,
-                    logContext,
-                    createRestoreConsumer(processorTopology.storeToChangelogTopic()),
-                    stateRestoreListener),
+                new MockChangelogRegister(),
                 processorTopology.storeToChangelogTopic(),
                 new HashSet<>(partitionsByInputTopic.values())
             );
@@ -1202,6 +1196,20 @@ public class TopologyTestDriver implements Closeable {
         stateDirectory.clean();
     }
 
+    static class MockChangelogRegister implements ChangelogRegister {
+        private final Set<TopicPartition> restoringPartitions = new HashSet<>();
+
+        @Override
+        public void register(final TopicPartition partition, final ProcessorStateManager stateManager) {
+            restoringPartitions.add(partition);
+        }
+
+        @Override
+        public void unregister(final Collection<TopicPartition> partitions) {
+            restoringPartitions.removeAll(partitions);
+        }
+    }
+
     static class MockTime implements Time {
         private final AtomicLong timeMs;
         private final AtomicLong highResTimeNs;
@@ -1239,34 +1247,5 @@ public class TopologyTestDriver implements Closeable {
         public void waitObject(final Object obj, final Supplier<Boolean> condition, final long timeoutMs) {
             throw new UnsupportedOperationException();
         }
-
-    }
-
-    private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
-            @Override
-            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
-
-            @Override
-            public synchronized long position(final TopicPartition partition) {
-                return 0L;
-            }
-        };
-
-        // for each store
-        for (final Map.Entry<String, String> storeAndTopic : storeToChangelogTopic.entrySet()) {
-            final String topicName = storeAndTopic.getValue();
-            // Set up the restore-state topic ...
-            // consumer.subscribe(new TopicPartition(topicName, 0));
-            // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
-            final List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
-            consumer.updatePartitions(topicName, partitionInfos);
-            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
-        }
-        return consumer;
     }
 }


Mime
View raw message