kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10085: correctly compute lag for optimized source changelogs (#8787)
Date Thu, 11 Jun 2020 15:15:56 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 7aba1af  KAFKA-10085: correctly compute lag for optimized source changelogs (#8787)
7aba1af is described below

commit 7aba1afcbadc57978f6491ddc1545601056ee6b0
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu Jun 11 07:47:48 2020 -0700

    KAFKA-10085: correctly compute lag for optimized source changelogs (#8787)
    
    Split out the optimized source changelogs and fetch the committed offsets rather than
the end offset for task lag computation
    
    Reviewers: John Roesler <vvcephei@apache.org>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  8 ++-
 .../streams/processor/internals/ClientUtils.java   | 63 +++++++++++++---
 .../internals/InternalTopologyBuilder.java         |  7 ++
 .../processor/internals/StoreChangelogReader.java  | 15 ++--
 .../internals/StreamsPartitionAssignor.java        | 84 +++++++++++++++-------
 .../streams/processor/internals/TaskManager.java   |  4 ++
 .../processor/internals/ClientUtilsTest.java       | 71 ++++++++++++------
 .../internals/StreamsPartitionAssignorTest.java    | 83 +++++++++++++++++++++
 8 files changed, 270 insertions(+), 65 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 263bf9a..b3900c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
@@ -1247,7 +1248,12 @@ public class KafkaStreams implements AutoCloseable {
         }
 
         log.debug("Current changelog positions: {}", allChangelogPositions);
-        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions,
adminClient);
+        final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets;
+        try {
+            allEndOffsets = fetchEndOffsets(allPartitions, adminClient);
+        } catch (final TimeoutException e) {
+            throw new StreamsException("Timed out obtaining end offsets from kafka", e);
+        }
         log.debug("Current end offsets :{}", allEndOffsets);
 
         for (final Map.Entry<TopicPartition, ListOffsetsResultInfo> entry : allEndOffsets.entrySet())
{
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 613565d..44d3484 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -21,17 +21,21 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -95,19 +99,60 @@ public class ClientUtils {
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final
Collection<TopicPartition> partitions,
-                                                                             final Admin
adminClient) {
-        final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
+    /**
+     * @throws StreamsException if the consumer throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request times out
+     */
+    public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPartition>
partitions,
+                                                                  final Consumer<byte[],
byte[]> consumer) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        final Map<TopicPartition, Long> committedOffsets;
         try {
-            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future
=  adminClient.listOffsets(
-                partitions.stream().collect(Collectors.toMap(Function.identity(), tp ->
OffsetSpec.latest())))
-                                                                                        .all();
-            endOffsets = future.get();
+            // those which do not have a committed offset would default to 0
+            committedOffsets = consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null
? 0L : e.getValue().offset()));
+        } catch (final TimeoutException e) {
+            LOG.warn("The committed offsets request timed out, try increasing the consumer
client's default.api.timeout.ms", e);
+            throw e;
+        } catch (final KafkaException e) {
+            LOG.warn("The committed offsets request failed.", e);
+            throw new StreamsException(String.format("Failed to retrieve end offsets for
%s", partitions), e);
+        }
+
+        return committedOffsets;
+    }
 
+    public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final
Collection<TopicPartition> partitions,
+                                                                                        
       final Admin adminClient) {
+        return adminClient.listOffsets(
+            partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest())))
+            .all();
+    }
+
+    /**
+     * A helper method that wraps the {@code Future#get} call and rethrows any thrown exception
as a StreamsException
+     * @throws StreamsException if the admin client request throws an exception
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> getEndOffsets(final KafkaFuture<Map<TopicPartition,
ListOffsetsResultInfo>> endOffsetsFuture) {
+        try {
+            return endOffsetsFuture.get();
         } catch (final RuntimeException | InterruptedException | ExecutionException e) {
-            LOG.warn("listOffsets request failed.", e);
+            LOG.warn("The listOffsets request failed.", e);
             throw new StreamsException("Unable to obtain end offsets from kafka", e);
         }
-        return endOffsets;
+    }
+
+    /**
+     * @throws StreamsException if the admin client request throws an exception
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final
Collection<TopicPartition> partitions,
+                                                                             final Admin
adminClient) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        return getEndOffsets(fetchEndOffsetsFuture(partitions, adminClient));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 4f1bdc6..9844341 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1795,6 +1795,13 @@ public class InternalTopologyBuilder {
             return topicConfigs;
         }
 
+        /**
+         * Returns the topic names for any optimized source changelogs
+         */
+        public Set<String> sourceTopicChangelogs() {
+            return sourceTopics.stream().filter(stateChangelogTopics::containsKey).collect(Collectors.toSet());
+        }
+
         @Override
         public boolean equals(final Object o) {
             if (o instanceof TopicsInfo) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index cecbbd3..ad5cce0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -45,6 +45,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
+
 /**
  * ChangelogReader is created and maintained by the stream thread and used for both updating
standby tasks and
  * restoring active tasks. It manages the restore consumer, including its assigned partitions,
when to pause / resume
@@ -546,23 +548,14 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition>
partitions) {
-        if (partitions.isEmpty())
-            return Collections.emptyMap();
-
         final Map<TopicPartition, Long> committedOffsets;
         try {
-            // those do not have a committed offset would default to 0
-            committedOffsets =  mainConsumer.committed(partitions).entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null
? 0L : e.getValue().offset()));
+            committedOffsets = fetchCommittedOffsets(partitions, mainConsumer);
         } catch (final TimeoutException e) {
-            // if it timed out we just retry next time.
+            // if it timed out we just retry next time
             return Collections.emptyMap();
-        } catch (final KafkaException e) {
-            throw new StreamsException(String.format("Failed to retrieve end offsets for
%s", partitions), e);
         }
-
         lastUpdateOffsetTime = time.milliseconds();
-
         return committedOffsets;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index bc6ec06..9352a3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -23,9 +23,11 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -72,7 +74,8 @@ import java.util.stream.Collectors;
 
 import static java.util.Comparator.comparingLong;
 import static java.util.UUID.randomUUID;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
@@ -630,12 +633,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     }
 
     /**
-     * Resolve changelog topic metadata and create them if necessary. Fills in the changelogsByStatefulTask
map
-     * and returns the set of changelogs which were newly created.
+     * Resolve changelog topic metadata and create them if necessary. Fills in the changelogsByStatefulTask
map and
+     * the optimizedSourceChangelogs set and returns the set of changelogs which were newly
created.
      */
     private Set<String> prepareChangelogTopics(final Map<Integer, TopicsInfo>
topicGroups,
                                                final Map<Integer, Set<TaskId>>
tasksForTopicGroup,
-                                               final Map<TaskId, Set<TopicPartition>>
changelogsByStatefulTask) {
+                                               final Map<TaskId, Set<TopicPartition>>
changelogsByStatefulTask,
+                                               final Set<String> optimizedSourceChangelogs)
{
         // add tasks to state change log topic subscribers
         final Map<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<>();
         for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
@@ -671,6 +675,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 topicConfig.setNumberOfPartitions(numPartitions);
                 changelogTopicMetadata.put(topicConfig.name(), topicConfig);
             }
+
+            optimizedSourceChangelogs.addAll(topicsInfo.sourceTopicChangelogs());
         }
 
         final Set<String> newlyCreatedTopics = internalTopicManager.makeReady(changelogTopicMetadata);
@@ -692,11 +698,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask,
fullMetadata);
 
         final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask = new
HashMap<>();
-        final Set<String> newlyCreatedChangelogs = prepareChangelogTopics(topicGroups,
tasksForTopicGroup, changelogsByStatefulTask);
+        final Set<String> optimizedSourceChangelogs = new HashSet<>();
+        final Set<String> newlyCreatedChangelogs =
+            prepareChangelogTopics(topicGroups, tasksForTopicGroup, changelogsByStatefulTask,
optimizedSourceChangelogs);
 
         final Map<UUID, ClientState> clientStates = new HashMap<>();
         final boolean lagComputationSuccessful =
-            populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask,
newlyCreatedChangelogs);
+            populateClientStatesMap(clientStates,
+                clientMetadataMap,
+                taskForPartition,
+                changelogsByStatefulTask,
+                newlyCreatedChangelogs,
+                optimizedSourceChangelogs
+            );
 
         final Set<TaskId> allTasks = partitionsForTask.keySet();
         final Set<TaskId> statefulTasks = changelogsByStatefulTask.keySet();
@@ -747,7 +761,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                                             final Map<UUID, ClientMetadata> clientMetadataMap,
                                             final Map<TopicPartition, TaskId> taskForPartition,
                                             final Map<TaskId, Set<TopicPartition>>
changelogsByStatefulTask,
-                                            final Set<String> newlyCreatedChangelogs)
{
+                                            final Set<String> newlyCreatedChangelogs,
+                                            final Set<String> optimizedSourceChangelogs)
{
         boolean fetchEndOffsetsSuccessful;
         Map<TaskId, Long> allTaskEndOffsetSums;
         try {
@@ -756,18 +771,36 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions = new
ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if (optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets for non-source
changelogs
+            // asynchronously while we use the blocking Consumer#committed call to fetch
source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture
=
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient);
 
-            final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new
ArrayList<>(allChangelogPartitions);
-            allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask,
allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {
             allTaskEndOffsetSums = changelogsByStatefulTask.keySet().stream().collect(Collectors.toMap(t
-> t, t -> UNKNOWN_OFFSET_SUM));
             fetchEndOffsetsSuccessful = false;
         }
@@ -784,13 +817,16 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
     }
 
     /**
-     * @param endOffsets the listOffsets result from the adminClient, or null if the request
failed
      * @param changelogsByStatefulTask map from stateful task to its set of changelog topic
partitions
+     * @param endOffsets the listOffsets result from the adminClient
+     * @param sourceChangelogEndOffsets the end (committed) offsets of optimized source changelogs
+     * @param newlyCreatedChangelogPartitions any changelogs that were just created duringthis
assignment
      *
      * @return Map from stateful task to its total end offset summed across all changelog
partitions
      */
-    private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TopicPartition,
ListOffsetsResultInfo> endOffsets,
-                                                         final Map<TaskId, Set<TopicPartition>>
changelogsByStatefulTask,
+    private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TaskId, Set<TopicPartition>>
changelogsByStatefulTask,
+                                                         final Map<TopicPartition, ListOffsetsResultInfo>
endOffsets,
+                                                         final Map<TopicPartition, Long>
sourceChangelogEndOffsets,
                                                          final Collection<TopicPartition>
newlyCreatedChangelogPartitions) {
         final Map<TaskId, Long> taskEndOffsetSums = new HashMap<>();
         for (final Map.Entry<TaskId, Set<TopicPartition>> taskEntry : changelogsByStatefulTask.entrySet())
{
@@ -802,13 +838,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 final long changelogEndOffset;
                 if (newlyCreatedChangelogPartitions.contains(changelog)) {
                     changelogEndOffset = 0L;
+                } else if (sourceChangelogEndOffsets.containsKey(changelog)) {
+                    changelogEndOffset = sourceChangelogEndOffsets.get(changelog);
+                } else if (endOffsets.containsKey(changelog)) {
+                    changelogEndOffset = endOffsets.get(changelog).offset();
                 } else {
-                    final ListOffsetsResultInfo offsetResult = endOffsets.get(changelog);
-                    if (offsetResult == null) {
-                        log.debug("Fetched end offsets did not contain the changelog {} of
task {}", changelog, task);
-                        throw new IllegalStateException("Could not get end offset for " +
changelog);
-                    }
-                    changelogEndOffset = offsetResult.offset();
+                    log.debug("Fetched offsets did not contain the changelog {} of task {}",
changelog, task);
+                    throw new IllegalStateException("Could not get end offset for " + changelog);
                 }
                 final long newEndOffsetSum = taskEndOffsetSums.get(task) + changelogEndOffset;
                 if (newEndOffsetSum < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 05349c4..460796e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -119,6 +119,10 @@ public class TaskManager {
         this.mainConsumer = mainConsumer;
     }
 
+    Consumer<byte[], byte[]> mainConsumer() {
+        return mainConsumer;
+    }
+
     public UUID processId() {
         return processId;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index 6d9d8df..a6c5e3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -16,18 +16,15 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static java.util.Collections.emptyList;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertThrows;
-
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -35,59 +32,93 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
+import static java.util.Collections.emptySet;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
+import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
 public class ClientUtilsTest {
 
+    private static final Set<TopicPartition> PARTITIONS = mkSet(
+        new TopicPartition("topic", 1),
+        new TopicPartition("topic", 2)
+    );
+
     @Test
-    public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() {
+    public void fetchCommittedOffsetsShouldRethrowKafkaExceptionAsStreamsException() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
+        expect(consumer.committed(PARTITIONS)).andThrow(new KafkaException());
+        replay(consumer);
+        assertThrows(StreamsException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
+    }
+
+    @Test
+    public void fetchCommittedOffsetsShouldRethrowTimeoutException() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
+        expect(consumer.committed(PARTITIONS)).andThrow(new TimeoutException());
+        replay(consumer);
+        assertThrows(TimeoutException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
+    }
+
+    @Test
+    public void fetchCommittedOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
+        assertTrue(fetchCommittedOffsets(emptySet(), consumer).isEmpty());
+    }
+
+    @Test
+    public void fetchEndOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
-        EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException());
-        replay(adminClient);
-        assertThrows(StreamsException.class, () ->  fetchEndOffsets(emptyList(), adminClient));
-        verify(adminClient);
+        assertTrue(fetchEndOffsets(emptySet(), adminClient).isEmpty());
     }
 
     @Test
-    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws
Exception {
+    public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() throws Exception
{
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
 
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
         EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
+        EasyMock.expect(allFuture.get()).andThrow(new RuntimeException());
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws
Exception {
+    public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws
Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
 
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
         EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
+        EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
         verify(adminClient);
     }
 
     @Test
-    public void fetchEndOffsetsWithTimeoutShouldRethrowTimeoutExceptionAsStreamsException()
throws Exception {
+    public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws
Exception {
         final Admin adminClient = EasyMock.createMock(AdminClient.class);
         final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
         final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= EasyMock.createMock(KafkaFuture.class);
 
         EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
         EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new TimeoutException());
+        EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
         replay(adminClient, result, allFuture);
 
-        assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient));
+        assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
         verify(adminClient);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 77d4ebf..b3d4169 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -21,10 +21,13 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -1870,6 +1873,86 @@ public class StreamsPartitionAssignorTest {
         EasyMock.verify(adminClient);
     }
 
+    @Test
+    public void shouldRequestEndOffsetsForPreexistingChangelogs() {
+        final Set<TopicPartition> changelogs = mkSet(
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+        );
+        adminClient = EasyMock.createMock(AdminClient.class);
+        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
+        final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture
= new KafkaFutureImpl<>();
+        allFuture.complete(changelogs.stream().collect(Collectors.toMap(
+            tp -> tp,
+            tp -> {
+                final ListOffsetsResultInfo info = EasyMock.createNiceMock(ListOffsetsResultInfo.class);
+                expect(info.offset()).andStubReturn(Long.MAX_VALUE);
+                EasyMock.replay(info);
+                return info;
+            }))
+        );
+        final Capture<Map<TopicPartition, OffsetSpec>> capturedChangelogs = EasyMock.newCapture();
+
+        expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andStubReturn(result);
+        expect(result.all()).andReturn(allFuture);
+
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor1");
+
+        subscriptions.put("consumer10",
+            new Subscription(
+                singletonList("topic1"),
+                defaultSubscriptionInfo.encode()
+            ));
+
+        EasyMock.replay(result);
+        configureDefault();
+        overwriteInternalTopicManagerWithMock(false);
+
+        partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
+
+        EasyMock.verify(adminClient);
+        assertThat(
+            capturedChangelogs.getValue().keySet(),
+            equalTo(changelogs)
+        );
+    }
+
+    @Test
+    public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
+        final Set<TopicPartition> changelogs = mkSet(
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
+            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+        );
+
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.table("topic1", Materialized.as("store"));
+
+        subscriptions.put("consumer10",
+            new Subscription(
+                singletonList("topic1"),
+                defaultSubscriptionInfo.encode()
+            ));
+
+        final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
+
+        createDefaultMockTaskManager();
+        EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE));
+        overwriteInternalTopicManagerWithMock(false);
+
+        EasyMock.expect(consumerClient.committed(changelogs))
+            .andStubReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp ->
new OffsetAndMetadata(Long.MAX_VALUE))));
+
+        EasyMock.replay(consumerClient);
+        partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
+
+        EasyMock.verify(consumerClient);
+    }
+
     private static ByteBuffer encodeFutureSubscription() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported
version */);
         buf.putInt(LATEST_SUPPORTED_VERSION + 1);


Mime
View raw message