kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10500: Allow people to add new StreamThread at runtime (#9615)
Date Fri, 04 Dec 2020 19:22:00 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ece7fe  KAFKA-10500: Allow people to add new StreamThread at runtime (#9615)
9ece7fe is described below

commit 9ece7fe372230e848920d843af2d0d5c2fbaac4d
Author: Walker Carlson <18128741+wcarlson5@users.noreply.github.com>
AuthorDate: Fri Dec 4 11:21:03 2020 -0800

    KAFKA-10500: Allow people to add new StreamThread at runtime (#9615)
    
    Part of KIP-663.
    
    Reviewers: Bruno Cadonna <bruno@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 146 +++++++++++++++------
 .../kafka/streams/state/internals/ThreadCache.java |   3 +
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  44 +++++++
 .../integration/AdjustStreamThreadCountTest.java   | 111 ++++++++++++++++
 4 files changed, 262 insertions(+), 42 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 44ce307..4fb2b2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -84,6 +84,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
@@ -161,11 +162,20 @@ public class KafkaStreams implements AutoCloseable {
     private final ProcessorTopology taskTopology;
     private final ProcessorTopology globalTaskTopology;
     private final long totalCacheSize;
+    private final StreamStateListener streamStateListener;
+    private final StateRestoreListener delegatingStateRestoreListener;
+    private final Map<Long, StreamThread.State> threadState;
+    private final ArrayList<StreamThreadStateStoreProvider> storeProviders;
+    private final UUID processId;
+    private final KafkaClientSupplier clientSupplier;
+    private final InternalTopologyBuilder internalTopologyBuilder;
 
     GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
     private StateRestoreListener globalStateRestoreListener;
     private boolean oldHandler;
+    private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
+    private final Object changeThreadCount = new Object();
 
     // container states
     /**
@@ -397,6 +407,7 @@ public class KafkaStreams implements AutoCloseable {
         final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception,
streamsUncaughtExceptionHandler);
         synchronized (stateLock) {
             if (state == State.CREATED) {
+                this.streamsUncaughtExceptionHandler = handler;
                 Objects.requireNonNull(streamsUncaughtExceptionHandler);
                 for (final StreamThread thread : threads) {
                     thread.setStreamsUncaughtExceptionHandler(handler);
@@ -755,7 +766,7 @@ public class KafkaStreams implements AutoCloseable {
         this.config = config;
         this.time = time;
         // The application ID is a required config and hence should always have value
-        final UUID processId = UUID.randomUUID();
+        processId = UUID.randomUUID();
         final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         if (userClientId.length() <= 0) {
@@ -765,7 +776,7 @@ public class KafkaStreams implements AutoCloseable {
         }
         final LogContext logContext = new LogContext(String.format("stream-client [%s] ",
clientId));
         this.log = logContext.logger(getClass());
-
+        this.clientSupplier = clientSupplier;
         final MetricConfig metricConfig = new MetricConfig()
             .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -792,7 +803,7 @@ public class KafkaStreams implements AutoCloseable {
         ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state);
         log.info("Kafka Streams version: {}", ClientMetrics.version());
         log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
-
+        this.internalTopologyBuilder = internalTopologyBuilder;
         // re-write the physical topology according to the config
         internalTopologyBuilder.rewriteTopology(config);
 
@@ -820,18 +831,18 @@ public class KafkaStreams implements AutoCloseable {
             throw new TopologyException("Topology has no stream threads and no global threads,
" +
                 "must subscribe to at least one source topic or global table.");
         }
-
+        oldHandler = false;
         totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
         final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
         final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
                 (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
-
+        streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
         try {
             stateDirectory = new StateDirectory(config, time, hasPersistentStores);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);
         }
-        final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
+        delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
         if (hasGlobalTopology) {
             final String globalThreadId = clientId + "-GlobalStreamThread";
@@ -845,7 +856,7 @@ public class KafkaStreams implements AutoCloseable {
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
@@ -853,59 +864,110 @@ public class KafkaStreams implements AutoCloseable {
         // use client id instead of thread client id since this admin client may be shared
among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
-        for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
-
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now)
->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
-
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState,
globalThreadState);
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);
         if (hasGlobalTopology) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (final StreamThread thread : threads) {
-            thread.setStateListener(streamStateListener);
+        for (int i = 1; i <= numStreamThreads; i++) {
+            createStreamThread(cacheSizePerThread, i);
         }
 
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now)
->
+            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId,
config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx)
{
+        final StreamThread streamThread = StreamThread.create(
+            internalTopologyBuilder,
+            config,
+            clientSupplier,
+            adminClient,
+            processId,
+            clientId,
+            streamsMetrics,
+            time,
+            streamsMetadataState,
+            cacheSizePerThread,
+            stateDirectory,
+            delegatingStateRestoreListener,
+            threadIdx,
+            KafkaStreams.this::closeToError,
+            streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already
running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream
thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over
all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING
or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not
be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (changeThreadCount) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread,
threadIdx);
+                synchronized (stateLock) {
+                    if (isRunningOrRebalancing()) {
+                        streamThread.start();
+                        return Optional.of(streamThread.getName());
+                    } else {
+                        streamThread.shutdown();
+                        threads.remove(streamThread);
+                        resizeThreadCache(getCacheSizePerThread(threads.size()));
+                        return Optional.empty();
+                    }
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();
+        for (final StreamThread streamThread: threads) {
+            names.add(streamThread.getName());
+        }
+        final String baseName = clientId + "-StreamThread-";
+        for (int i = 1; i <= threads.size(); i++) {
+            final String name = baseName + i;
+            if (!names.contains(name)) {
+                return i;
+            }
+        }
+        return threads.size() + 1;
+    }
+
     private long getCacheSizePerThread(final int numStreamThreads) {
         return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0));
     }
 
-    private void resizeThreadCache(final int numStreamThreads) {
-        final long cacheSizePreThread = getCacheSizePerThread(numStreamThreads);
+    private void resizeThreadCache(final long cacheSizePerThread) {
         for (final StreamThread streamThread: threads) {
-            streamThread.resizeCache(cacheSizePreThread);
+            streamThread.resizeCache(cacheSizePerThread);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index af3d767..9e99d3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -76,6 +76,9 @@ public class ThreadCache {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            if (caches.values().isEmpty()) {
+                return;
+            }
             final CircularIterator<NamedCache> circularIterator = new CircularIterator<>(caches.values());
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b2e3971..20de104 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -78,6 +78,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Executors;
@@ -312,6 +313,9 @@ public class KafkaStreamsTest {
                 StreamThread.State.PARTITIONS_ASSIGNED);
             return null;
         }).anyTimes();
+        thread.resizeCache(EasyMock.anyLong());
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.expect(thread.getName()).andStubReturn("newThread");
         thread.shutdown();
         EasyMock.expectLastCall().andAnswer(() -> {
             supplier.consumer.close();
@@ -589,6 +593,46 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void shouldAddThreadWhenRunning() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING,
15L, "wait until running");
+        assertThat(streams.addStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+    }
+
+    @Test
+    public void shouldNotAddThreadWhenCreated() {
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
+        final int oldSize = streams.threads.size();
+        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+        assertThat(streams.threads.size(), equalTo(oldSize));
+    }
+
+    @Test
+    public void shouldNotAddThreadWhenClosed() {
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
+        final int oldSize = streams.threads.size();
+        streams.close();
+        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+        assertThat(streams.threads.size(), equalTo(oldSize));
+    }
+
+    @Test
+    public void shouldNotAddThreadWhenError() {
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
+        final int oldSize = streams.threads.size();
+        streams.start();
+        streamThreadOne.shutdown();
+        streamThreadTwo.shutdown();
+        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+        assertThat(streams.threads.size(), equalTo(oldSize));
+    }
+
+
+    @Test
     public void testCannotStartOnceClosed() {
         final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
new file mode 100644
index 0000000..649fc86
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+            )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties))
{
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(),
equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount
+ 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(),
equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
"wait for running");
+        }
+    }
+}


Mime
View raw message