kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10086: Integration test for ensuring warmups are effective (#8818)
Date Thu, 11 Jun 2020 13:40:46 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 91181e7  KAFKA-10086: Integration test for ensuring warmups are effective (#8818)
91181e7 is described below

commit 91181e76f46bfcad4b3ae8ef8b22809542281a7d
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Jun 11 08:31:20 2020 -0500

    KAFKA-10086: Integration test for ensuring warmups are effective (#8818)
    
    Add an integration test for the task assignor.
    * ensure we see proper scale-out behavior with warmups
    * ensure in-memory stores are properly recycled and not restored through the scale-out
process
    
    Fix two bugs revealed by the test:
    
    Bug 1: we can't remove active tasks in the cooperative algorithm, because this causes
their state to get discarded (definitely for in-memory stores, and maybe for persistent ones,
depending on the state cleaner). Instead, we convert them to standbys so they can keep warm.
    
    Bug 2: tasks with only in-memory stores weren't reporting their offset positions
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
---
 .../internals/StreamsPartitionAssignor.java        |  75 ++++--
 .../streams/processor/internals/TaskManager.java   |  23 +-
 .../internals/assignment/ClientState.java          |   2 +-
 ...ighAvailabilityTaskAssignorIntegrationTest.java | 280 +++++++++++++++++++++
 .../integration/TaskAssignorIntegrationTest.java   |  13 +-
 .../internals/StreamsPartitionAssignorTest.java    |  21 +-
 6 files changed, 367 insertions(+), 47 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 3f2cc87..bc6ec06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -16,11 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -59,11 +54,16 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -957,7 +957,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             final List<TopicPartition> activePartitionsList = new ArrayList<>();
             final List<TaskId> assignedActiveList = new ArrayList<>();
 
-            final boolean tasksRevoked = populateActiveTaskAndPartitionsLists(
+            final Set<TaskId> activeTasksRemovedPendingRevokation = populateActiveTaskAndPartitionsLists(
                 activePartitionsList,
                 assignedActiveList,
                 consumer,
@@ -967,8 +967,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 allOwnedPartitions
             );
 
-            final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
-                buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask);
+            final Map<TaskId, Set<TopicPartition>> standbyTaskMap = buildStandbyTaskMap(
+                    consumer,
+                    standbyTaskAssignments.get(consumer),
+                    activeTasksRemovedPendingRevokation,
+                    partitionsForTask,
+                    clientMetadata.state
+                );
 
             final AssignmentInfo info = new AssignmentInfo(
                 minUserMetadataVersion,
@@ -980,7 +985,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 AssignorError.NONE.code()
             );
 
-            if (tasksRevoked) {
+            if (!activeTasksRemovedPendingRevokation.isEmpty()) {
                 // TODO: once KAFKA-10078 is resolved we can leave it to the client to trigger
this rebalance
                 log.info("Requesting followup rebalance be scheduled immediately due to tasks
changing ownership.");
                 info.setNextRebalanceTime(0L);
@@ -1009,17 +1014,16 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
      * Populates the lists of active tasks and active task partitions for the consumer with
a 1:1 mapping between them
      * such that the nth task corresponds to the nth partition in the list. This means tasks
with multiple partitions
      * will be repeated in the list.
-     * @return whether we had to remove any partitions from the assignment in order to wait
for them to be safely revoked
      */
-    private boolean populateActiveTaskAndPartitionsLists(final List<TopicPartition>
activePartitionsList,
-                                                         final List<TaskId> assignedActiveList,
-                                                         final String consumer,
-                                                         final ClientState clientState,
-                                                         final List<TaskId> activeTasksForConsumer,
-                                                         final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
-                                                         final Set<TopicPartition>
allOwnedPartitions) {
+    private Set<TaskId> populateActiveTaskAndPartitionsLists(final List<TopicPartition>
activePartitionsList,
+                                                             final List<TaskId> assignedActiveList,
+                                                             final String consumer,
+                                                             final ClientState clientState,
+                                                             final List<TaskId> activeTasksForConsumer,
+                                                             final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                                             final Set<TopicPartition>
allOwnedPartitions) {
         final List<AssignedPartition> assignedPartitions = new ArrayList<>();
-        boolean needToRevokePartitions = false;
+        final Set<TaskId> removedActiveTasks = new TreeSet<>();
 
         // Build up list of all assigned partition-task pairs
         for (final TaskId taskId : activeTasksForConsumer) {
@@ -1031,12 +1035,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
                 // If the partition is new to this consumer but is still owned by another,
remove from the assignment
                 // until it has been revoked and can safely be reassigned according to the
COOPERATIVE protocol
                 if (newPartitionForConsumer && allOwnedPartitions.contains(partition))
{
-                    log.info("Removing task {} from assignment until it is safely revoked
in followup rebalance", taskId);
-                    clientState.unassignActive(taskId);
+                    log.info(
+                        "Removing task {} from {} active assignment until it is safely revoked
in followup rebalance",
+                        taskId,
+                        consumer
+                    );
+                    removedActiveTasks.add(taskId);
+
                     // Clear the assigned partitions list for this task if any partition
can not safely be assigned,
                     // so as not to encode a partial task
                     assignedPartitionsForTask.clear();
-                    needToRevokePartitions = true;
+
+                    // This has no effect on the assignment, as we'll never consult the ClientState
again, but
+                    // it does perform a useful assertion that the task was actually assigned.
+                    clientState.unassignActive(taskId);
                     break;
                 } else {
                     assignedPartitionsForTask.add(new AssignedPartition(taskId, partition));
@@ -1052,18 +1064,33 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             assignedActiveList.add(partition.taskId);
             activePartitionsList.add(partition.partition);
         }
-        return needToRevokePartitions;
+        return removedActiveTasks;
     }
 
     /**
      * @return map from task id to its assigned partitions for all standby tasks
      */
-    private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final
Collection<TaskId> standbys,
-                                                                        final Map<TaskId,
Set<TopicPartition>> partitionsForTask) {
+    private Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(final String
consumer,
+                                                                 final Iterable<TaskId>
standbys,
+                                                                 final Iterable<TaskId>
tasksRevoked,
+                                                                 final Map<TaskId, Set<TopicPartition>>
partitionsForTask,
+                                                                 final ClientState clientState)
{
         final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<>();
         for (final TaskId task : standbys) {
             standbyTaskMap.put(task, partitionsForTask.get(task));
         }
+        for (final TaskId task : tasksRevoked) {
+            log.info(
+                "Adding removed active task {} as a standby for {} until it is safely revoked
in followup rebalance",
+                task,
+                consumer
+            );
+            standbyTaskMap.put(task, partitionsForTask.get(task));
+
+            // This has no effect on the assignment, as we'll never consult the ClientState
again, but
+            // it does perform a useful assertion that the it's legal to assign this task
as a standby to this instance
+            clientState.assignStandby(task);
+        }
         return standbyTaskMap;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index fdbaf1c..05349c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.common.utils.Utils.union;
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
 
@@ -507,17 +508,24 @@ public class TaskManager {
 
     /**
      * Compute the offset total summed across all stores in a task. Includes offset sum for
any tasks we own the
-     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}
-     *
-     * @return Map from task id to its total offset summed across all state stores
+     * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
+     * Does not include stateless or non-logged tasks.
      */
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        for (final TaskId id : lockedTaskDirectories) {
+        // Not all tasks will create directories, and there may be directories for tasks
we don't currently own,
+        // so we consider all tasks that are either owned or on disk. This includes stateless
tasks, which should
+        // just have an empty changelogOffsets map.
+        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet()))
{
             final Task task = tasks.get(id);
             if (task != null) {
-                taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets()));
+                final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
+                if (changelogOffsets.isEmpty()) {
+                    log.debug("Skipping to encode apparently stateless (or non-logged) offset
sum for task {}", id);
+                } else {
+                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets));
+                }
             } else {
                 final File checkpointFile = stateDirectory.checkpointFileFor(id);
                 try {
@@ -598,13 +606,14 @@ public class TaskManager {
             final long offset = changelogEntry.getValue();
 
 
-            if (offset == Task.LATEST_OFFSET) { // this condition can only be true for active
tasks; never for standby
+            if (offset == Task.LATEST_OFFSET) {
+                // this condition can only be true for active tasks; never for standby
                 // for this case, the offset of all partitions is set to `LATEST_OFFSET`
                 // and we "forward" the sentinel value directly
                 return Task.LATEST_OFFSET;
             } else {
                 if (offset < 0) {
-                    throw new IllegalStateException("Offset should not be negative.");
+                    throw new IllegalStateException("Expected not to get a sentinel offset,
but got: " + changelogEntry);
                 }
                 offsetSum += offset;
                 if (offsetSum < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 616cd42..c98ed9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -174,7 +174,7 @@ public class ClientState {
         return standbyTasks.size();
     }
 
-    void assignStandby(final TaskId task) {
+    public void assignStandby(final TaskId task) {
         assertNotAssigned(task);
         standbyTasks.add(task);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
new file mode 100644
index 0000000..725c805
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.NoRetryException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@Category(IntegrationTest.class)
+public class HighAvailabilityTaskAssignorIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException
{
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance,
and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+    }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException
{
+        // NB: this test takes at least a minute to run, because it needs a probing rebalance,
and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)));
+    }
+
+    private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object,
Object, KeyValueStore<Bytes, byte[]>>> materializedFunction) throws InterruptedException
{
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
+        final String inputTopic = "input" + testId;
+        final String storeName = "store" + testId;
+        final String storeChangelog = appId + "-store" + testId + "-changelog";
+        final Set<TopicPartition> changelogTopicPartitions = mkSet(
+            new TopicPartition(storeChangelog, 0),
+            new TopicPartition(storeChangelog, 1)
+        );
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
+
+        final ReentrantLock assignmentLock = new ReentrantLock();
+        final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
+        final Map<Integer, Boolean> assignmentsStable = new ConcurrentHashMap<>();
+        final AtomicBoolean assignmentStable = new AtomicBoolean(false);
+        final AssignmentListener assignmentListener =
+            stable -> {
+                assignmentLock.lock();
+                try {
+                    final int thisAssignmentIndex = assignmentsCompleted.incrementAndGet();
+                    assignmentsStable.put(thisAssignmentIndex, stable);
+                    assignmentStable.set(stable);
+                } finally {
+                    assignmentLock.unlock();
+                }
+            };
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(inputTopic, materializedFunction.apply(storeName));
+        final Topology topology = builder.build();
+
+        final Properties producerProperties = mkProperties(
+            mkMap(
+                mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
+                mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
+                mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
+            )
+        );
+
+        final StringBuilder kiloBuilder = new StringBuilder(1000);
+        for (int i = 0; i < 1000; i++) {
+            kiloBuilder.append('0');
+        }
+        final String kilo = kiloBuilder.toString();
+
+        try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties))
{
+            for (int i = 0; i < 1000; i++) {
+                producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo));
+            }
+        }
+
+        final Properties consumerProperties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
+            )
+        );
+
+
+        try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId,
assignmentListener));
+             final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId,
assignmentListener));
+             final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties))
{
+            kafkaStreams0.start();
+
+            // wait until all the input records are in the changelog
+            TestUtils.waitForCondition(
+                () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000,
+                120_000L,
+                () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions,
consumer)
+            );
+
+            final AtomicLong instance1TotalRestored = new AtomicLong(-1);
+            final AtomicLong instance1NumRestored = new AtomicLong(-1);
+            final CountDownLatch restoreCompleteLatch = new CountDownLatch(1);
+            kafkaStreams1.setGlobalStateRestoreListener(new StateRestoreListener() {
+                @Override
+                public void onRestoreStart(final TopicPartition topicPartition,
+                                           final String storeName,
+                                           final long startingOffset,
+                                           final long endingOffset) {
+                }
+
+                @Override
+                public void onBatchRestored(final TopicPartition topicPartition,
+                                            final String storeName,
+                                            final long batchEndOffset,
+                                            final long numRestored) {
+                    instance1NumRestored.accumulateAndGet(
+                        numRestored,
+                        (prev, restored) -> prev == -1 ? restored : prev + restored
+                    );
+                }
+
+                @Override
+                public void onRestoreEnd(final TopicPartition topicPartition,
+                                         final String storeName,
+                                         final long totalRestored) {
+                    instance1TotalRestored.accumulateAndGet(
+                        totalRestored,
+                        (prev, restored) -> prev == -1 ? restored : prev + restored
+                    );
+                    restoreCompleteLatch.countDown();
+                }
+            });
+            final int assignmentsBeforeScaleOut = assignmentsCompleted.get();
+            kafkaStreams1.start();
+            TestUtils.waitForCondition(
+                () -> {
+                    assignmentLock.lock();
+                    try {
+                        if (assignmentsCompleted.get() > assignmentsBeforeScaleOut) {
+                            assertFalseNoRetry(
+                                assignmentsStable.get(assignmentsBeforeScaleOut + 1),
+                                "the first assignment after adding a node should be unstable
while we warm up the state."
+                            );
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } finally {
+                        assignmentLock.unlock();
+                    }
+                },
+                120_000L,
+                "Never saw a first assignment after scale out: " + assignmentsCompleted.get()
+            );
+
+            TestUtils.waitForCondition(
+                assignmentStable::get,
+                120_000L,
+                "Assignment hasn't become stable: " + assignmentsCompleted.get() +
+                    " Note, if this does fail, check and see if the new instance just failed
to catch up within" +
+                    " the probing rebalance interval. A full minute should be long enough
to read ~500 records" +
+                    " in any test environment, but you never know..."
+            );
+
+            restoreCompleteLatch.await();
+            // We should finalize the restoration without having restored any records (because
they're already in
+            // the store. Otherwise, we failed to properly re-use the state from the standby.
+            assertThat(instance1TotalRestored.get(), is(0L));
+            // Belt-and-suspenders check that we never even attempt to restore any records.
+            assertThat(instance1NumRestored.get(), is(-1L));
+        }
+    }
+
+    private void assertFalseNoRetry(final boolean assertion, final String message) {
+        if (assertion) {
+            throw new NoRetryException(
+                new AssertionError(
+                    message
+                )
+            );
+        }
+    }
+
+    private static Properties streamsProperties(final String appId,
+                                                final AssignmentListener configuredAssignmentListener)
{
+        return mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "0"),
+                mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "0"), // make the warmup
catch up completely
+                mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "2"),
+                mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"),
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
+                mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName())
+            )
+        );
+    }
+
+    private static long getChangelogOffsetSum(final Set<TopicPartition> changelogTopicPartitions,
+                                              final Consumer<String, String> consumer)
{
+        long sum = 0;
+        final Collection<Long> values = consumer.endOffsets(changelogTopicPartitions).values();
+        for (final Long value : values) {
+            sum += value;
+        }
+        return sum;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 8aa5133..750790f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -26,9 +26,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -76,21 +78,23 @@ public class TaskAssignorIntegrationTest {
 
         final String testId = safeUniqueTestName(getClass(), testName);
         final String appId = "appId_" + testId;
+        final String inputTopic = "input" + testId;
 
-        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input");
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
 
         // Maybe I'm paranoid, but I don't want the compiler deciding that my lambdas are
equal to the identity
         // function and defeating my identity check
         final AtomicInteger compilerDefeatingReference = new AtomicInteger(0);
 
         // the implementation doesn't matter, we're just going to verify the reference.
-        final AssignorConfiguration.AssignmentListener configuredAssignmentListener =
+        final AssignmentListener configuredAssignmentListener =
             stable -> compilerDefeatingReference.incrementAndGet();
 
         final Properties properties = mkObjectProperties(
             mkMap(
                 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
                 mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
                 mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
                 mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
@@ -101,7 +105,7 @@ public class TaskAssignorIntegrationTest {
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        builder.stream("input");
+        builder.stream(inputTopic);
 
         try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties))
{
             kafkaStreams.start();
@@ -126,8 +130,7 @@ public class TaskAssignorIntegrationTest {
 
             final Field assignmentListenerField = StreamsPartitionAssignor.class.getDeclaredField("assignmentListener");
             assignmentListenerField.setAccessible(true);
-            final AssignorConfiguration.AssignmentListener actualAssignmentListener =
-                (AssignorConfiguration.AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
+            final AssignmentListener actualAssignmentListener = (AssignmentListener) assignmentListenerField.get(streamsPartitionAssignor);
 
 
             final Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("taskAssignorSupplier");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 03ab1a7..77d4ebf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -112,6 +112,8 @@ import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1619,16 +1621,15 @@ public class StreamsPartitionAssignorTest {
 
         // The new consumer's assignment should be empty until c1 has the chance to revoke
its partitions/tasks
         assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
-        assertThat(
-            AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()),
-            equalTo(new AssignmentInfo(
-                LATEST_SUPPORTED_VERSION,
-                emptyList(),
-                emptyMap(),
-                emptyMap(),
-                emptyMap(),
-                0
-            )));
+
+        final AssignmentInfo actualAssignment = AssignmentInfo.decode(assignment.get(CONSUMER_2).userData());
+        assertThat(actualAssignment.version(), is(LATEST_SUPPORTED_VERSION));
+        assertThat(actualAssignment.activeTasks(), empty());
+        // Note we're not asserting anything about standbys. If the assignor gave an active
task to CONSUMER_2, it would
+        // be converted to a standby, but we don't know whether the assignor will do that.
+        assertThat(actualAssignment.partitionsByHost(), anEmptyMap());
+        assertThat(actualAssignment.standbyPartitionByHost(), anEmptyMap());
+        assertThat(actualAssignment.errCode(), is(0));
     }
 
     @Test


Mime
View raw message