kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9127: don't create StreamThreads for global-only topology (2.4) (#8616)
Date Fri, 08 May 2020 19:10:27 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 8d54432  KAFKA-9127: don't create StreamThreads for global-only topology (2.4) (#8616)
8d54432 is described below

commit 8d54432437378cad873ef15ca3b4718d7f1c1152
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri May 8 12:09:41 2020 -0700

    KAFKA-9127: don't create StreamThreads for global-only topology (2.4) (#8616)
    
    Backports: https://github.com/apache/kafka/pull/8540
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  40 ++++++--
 .../internals/InternalTopologyBuilder.java         |   4 +
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 101 ++++++++++++++++-----
 .../integration/GlobalKTableIntegrationTest.java   |  18 ++++
 .../integration/utils/IntegrationTestUtils.java    |  27 ++++++
 6 files changed, 156 insertions(+), 36 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 941dd1b..c8678ac 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -175,7 +175,7 @@
               files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
+              files="(ProcessorStateManager|InternalTopologyBuilder|KafkaStreams|StreamsPartitionAssignor|StreamThread).java"/>
 
     <suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
               files="Murmur3.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 621bb91..3a22e14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.kstream.KStream;
@@ -48,6 +49,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -480,8 +482,9 @@ public class KafkaStreams implements AutoCloseable {
                     final GlobalStreamThread.State newState = (GlobalStreamThread.State)
abstractNewState;
                     globalThreadState = newState;
 
-                    // special case when global thread is dead
-                    if (newState == GlobalStreamThread.State.DEAD) {
+                    if (newState == GlobalStreamThread.State.RUNNING) {
+                        maybeSetRunning();
+                    } else if (newState == GlobalStreamThread.State.DEAD) {
                         if (setState(State.ERROR)) {
                             log.error("Global thread has died. The instance will be in error
state and should be closed.");
                         }
@@ -696,28 +699,45 @@ public class KafkaStreams implements AutoCloseable {
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Topology with no input topics will create no stream threads and no
global thread.");
+            throw new TopologyException("Topology has no stream threads and no global threads,
" +
+                "must subscribe to at least one source topic or global table.");
+        }
 
         long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
         if (totalCacheSize < 0) {
             totalCacheSize = 0;
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
-        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
-        final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology
== null ? 0 : 1));
-        final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
-                (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
+
+        final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology
? 1 : 0));
+        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
+                (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
 
         try {
-            stateDirectory = new StateDirectory(config, time, createStateDirectory);
+            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);
         }
 
         final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
-        if (globalTaskTopology != null) {
+        if (hasGlobalTopology) {
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(
                 globalTaskTopology,
@@ -758,7 +778,7 @@ public class KafkaStreams implements AutoCloseable {
         }
 
         final StreamStateListener streamStateListener = new StreamStateListener(threadState,
globalThreadState);
-        if (globalTaskTopology != null) {
+        if (hasGlobalTopology) {
             globalStreamThread.setStateListener(streamStateListener);
         }
         for (final StreamThread thread : threads) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d28aee8..f11536a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1235,6 +1235,10 @@ public class InternalTopologyBuilder {
         setRegexMatchedTopicToStateStore();
     }
 
+    public boolean hasNoNonGlobalTopology() {
+        return sourceTopicNames.isEmpty() && nodeToSourcePatterns.isEmpty();
+    }
+
     private boolean isGlobalSource(final String nodeName) {
         final NodeFactory nodeFactory = nodeFactories.get(nodeName);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index cd24685..8124e3e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -83,6 +84,7 @@ import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -104,7 +106,7 @@ public class KafkaStreamsTest {
     private MockTime time;
 
     private Properties props;
-
+    
     @Mock
     private StateDirectory stateDirectory;
     @Mock
@@ -317,7 +319,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.close();
 
         Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
@@ -325,7 +327,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException
{
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.setStateListener(streamsStateListener);
 
         Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -393,7 +395,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.setStateListener(streamsStateListener);
 
         Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -457,7 +459,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = getBuilderWithSource();
         builder.globalTable("anyTopic");
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -477,7 +479,7 @@ public class KafkaStreamsTest {
     @Test
     public void testStateThreadClose() throws Exception {
         // make sure we have the global state thread running too
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = getBuilderWithSource();
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
 
@@ -514,7 +516,7 @@ public class KafkaStreamsTest {
     @Test
     public void testStateGlobalThreadClose() throws Exception {
         // make sure we have the global state thread running too
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = getBuilderWithSource();
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
 
@@ -542,7 +544,7 @@ public class KafkaStreamsTest {
     public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
 
-        try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(),
props, supplier, time)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier, time)) {
             final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
             final int initDiff = newInitCount - oldInitCount;
             assertTrue("some reporters should be initialized by calling on construction",
initDiff > 0);
@@ -556,7 +558,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCloseIsIdempotent() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
@@ -567,7 +569,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void testCannotStartOnceClosed() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
         streams.close();
         try {
@@ -582,7 +584,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldNotSetGlobalRestoreListenerAfterStarting() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
         try {
             streams.setGlobalStateRestoreListener(null);
@@ -596,7 +598,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
         try {
             streams.setUncaughtExceptionHandler(null);
@@ -608,7 +610,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
         try {
             streams.setStateListener(null);
@@ -620,7 +622,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldAllowCleanupBeforeStartAndAfterClose() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         try {
             streams.cleanUp();
             streams.start();
@@ -632,7 +634,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.start();
         TestUtils.waitForCondition(
             () -> streams.state() == KafkaStreams.State.RUNNING,
@@ -648,46 +650,46 @@ public class KafkaStreamsTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props,
supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props,
supplier, time);
         streams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
     }
 
     @Test
     public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
         // do not use mock time so that it can really elapse
-        try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(),
props, supplier)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier)) {
             assertFalse(streams.close(Duration.ofMillis(10L)));
         }
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowOnNegativeTimeoutForClose() {
-        try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(),
props, supplier, time)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier, time)) {
             streams.close(Duration.ofMillis(-1L));
         }
     }
 
     @Test
     public void shouldNotBlockInCloseForZeroDuration() {
-        try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(),
props, supplier, time)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier, time)) {
             // with mock time that does not elapse, close would not return if it ever waits
on the state transition
             assertFalse(streams.close(Duration.ZERO));
         }
@@ -720,7 +722,7 @@ public class KafkaStreamsTest {
         builder.table("topic", Materialized.as("store"));
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.DEBUG.name());
 
-        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier,
time)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier, time)) {
             streams.start();
         }
 
@@ -743,7 +745,7 @@ public class KafkaStreamsTest {
         builder.table("topic", Materialized.as("store"));
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.INFO.name());
 
-        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier,
time)) {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(),
props, supplier, time)) {
             streams.start();
         }
 
@@ -763,7 +765,7 @@ public class KafkaStreamsTest {
         props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
 
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
         LogCaptureAppender.unregister(appender);
 
         assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
@@ -845,6 +847,49 @@ public class KafkaStreamsTest {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowTopologyExceptionOnEmptyTopology() {
+        try {
+            new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+            fail("Should have thrown TopologyException");
+        } catch (final TopologyException e) {
+            assertThat(
+                e.getMessage(),
+                equalTo("Invalid topology: Topology has no stream threads and no global threads,
" +
+                            "must subscribe to at least one source topic or global table."));
+        }
+    }
+
+    @Test
+    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+        assertThat(streams.threads.length, equalTo(0));
+    }
+
+    @Test
+    public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+        assertThat(streams.threads.length, equalTo(0));
+        assertEquals(streams.state(), KafkaStreams.State.CREATED);
+
+        streams.start();
+        TestUtils.waitForCondition(
+            () -> streams.state() == KafkaStreams.State.RUNNING,
+            "Streams never started, state is " + streams.state());
+
+        streams.close();
+
+        TestUtils.waitForCondition(
+            () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+            "Streams never stopped.");
+    }
+
     @SuppressWarnings("unchecked")
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
@@ -886,6 +931,12 @@ public class KafkaStreamsTest {
             new MockProcessorSupplier());
         return topology;
     }
+    
+    private StreamsBuilder getBuilderWithSource() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("source-topic");
+        return builder;
+    }
 
     private void startStreamsAndCheckDirExists(final Topology topology,
                                                final boolean shouldFilesExist) throws Exception
{
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 0a9148d..e33bdd1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -23,6 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -54,6 +56,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 
@@ -267,6 +271,20 @@ public class GlobalKTableIntegrationTest {
         assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
     }
 
+    @Test
+    public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
+        builder = new StreamsBuilder();
+        globalTable = builder.globalTable(
+            globalTableTopic,
+            Consumed.with(Serdes.Long(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
+
+        kafkaStreams.close();
+    }
+
     private void createTopics() throws Exception {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 4921c4f..29e584c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -809,6 +809,33 @@ public class IntegrationTestUtils {
         }
     }
 
+    /**
+     * Waits for the given {@link KafkaStreams} instances to all be in a  {@link State#RUNNING}
+     * state. Prefer {@link #startApplicationAndWaitUntilRunning(List, Duration)} when possible
+     * because this method uses polling, which can be more error prone and slightly slower.
+     *
+     * @param streamsList the list of streams instances to run.
+     * @param timeout the time to wait for the streams to all be in {@link State#RUNNING}
state.
+     */
+    public static void waitForApplicationState(final List<KafkaStreams> streamsList,
+                                               final State state,
+                                               final Duration timeout) throws InterruptedException
{
+        retryOnExceptionWithTimeout(timeout.toMillis(), () -> {
+            final Map<KafkaStreams, State> streamsToStates = streamsList
+                .stream()
+                .collect(Collectors.toMap(stream -> stream, KafkaStreams::state));
+
+            final Map<KafkaStreams, State> wrongStateMap = streamsToStates.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() != state)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+            final String reason = String.format("Expected all streams instances in %s to
be %s within %d ms, but the following were not: %s",
+                                                streamsList, state, timeout.toMillis(), wrongStateMap);
+            assertThat(reason, wrongStateMap.isEmpty());
+        });
+    }
+
     private static StateListener getStateListener(final KafkaStreams streams) {
         try {
             final Field field = streams.getClass().getDeclaredField("stateListener");


Mime
View raw message