kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4819: Expose states for active tasks to public API
Date Tue, 05 Sep 2017 21:12:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d78eb03fa -> facd2c5a8


KAFKA-4819: Expose states for active tasks to public API

Simple implementation of the feature : [KAFKA-4819](https://issues.apache.org/jira/browse/KAFKA-4819)
 KAFKA-4819

This PR adds a new method `threadStates` to public API of `KafkaStreams` which returns all
currently states of running threads and active tasks.

Below is a example for a simple topology consuming from topics; test-p2 and test-p4.

[{"name":"StreamThread-1","state":"RUNNING","activeTasks":[{"id":"0_0", "assignments":["test-p4-0","test-p2-0"],
"consumedOffsetsByPartition":[{"topicPartition":"test-p2-0","offset":"test-p2-0"}]}, {"id":"0_2",
"assignments":["test-p4-2"], "consumedOffsetsByPartition":[]}]}, {"name":"StreamThread-2","state":"RUNNING","activeTasks":[{"id":"0_1",
"assignments":["test-p4-1","test-p2-1"], "consumedOffsetsByPartition":[{"topicPartition":"test-p2-1","offset":"test-p2-1"}]},
{"id":"0_3", "assignments":["test-p4-3"], "consumedOffsetsByPartition":[]}]}]

Author: Florian Hussonnois <florian.hussonnois@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #2612 from fhussonnois/KAFKA-4819


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/facd2c5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/facd2c5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/facd2c5a

Branch: refs/heads/trunk
Commit: facd2c5a8c7a4379755955ba3e318bfecde6af39
Parents: d78eb03
Author: Florian Hussonnois <florian.hussonnois@gmail.com>
Authored: Tue Sep 5 14:11:59 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 5 14:11:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  21 +++
 .../kafka/streams/processor/TaskMetadata.java   |  74 +++++++++
 .../kafka/streams/processor/ThreadMetadata.java |  93 +++++++++++
 .../processor/internals/StreamThread.java       |  35 ++++
 .../internals/StreamsMetadataState.java         |   2 +-
 .../processor/internals/TaskManager.java        |   4 +
 .../apache/kafka/streams/KafkaStreamsTest.java  |  24 +++
 .../processor/internals/StreamThreadTest.java   | 163 ++++++++++++++++++-
 8 files changed, 410 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ed8525c..7698f39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.StreamsMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
@@ -811,8 +812,11 @@ public class KafkaStreams {
      * {@link Topology} and {@link StreamsBuilder}).
      *
      * @return A string representation of the Kafka Streams instance.
+     *
+     * @deprecated Use {@link #localThreadsMetadata()} to retrieve runtime information.
      */
     @Override
+    @Deprecated
     public String toString() {
         return toString("");
     }
@@ -824,7 +828,10 @@ public class KafkaStreams {
      *
      * @param indent the top-level indent for each line
      * @return A string representation of the Kafka Streams instance.
+     *
+     * @deprecated Use {@link #localThreadsMetadata()} to retrieve runtime information.
      */
+    @Deprecated
     public String toString(final String indent) {
         final StringBuilder sb = new StringBuilder()
             .append(indent)
@@ -978,4 +985,18 @@ public class KafkaStreams {
         validateIsRunning();
         return queryableStoreProvider.getStore(storeName, queryableStoreType);
     }
+
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link ThreadMetadata}.
+     */
+    public Set<ThreadMetadata> localThreadsMetadata() {
+        validateIsRunning();
+        final Set<ThreadMetadata> threadMetadata = new HashSet<>();
+        for (StreamThread thread : threads) {
+            threadMetadata.add(thread.threadMetadata());
+        }
+        return threadMetadata;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
new file mode 100644
index 0000000..15337a4
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public class TaskMetadata {
+
+    private final String taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    public TaskMetadata(final String taskId,
+                        final Set<TopicPartition> topicPartitions) {
+        this.taskId = taskId;
+        this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+    }
+
+    public String taskId() {
+        return taskId;
+    }
+
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TaskMetadata that = (TaskMetadata) o;
+        return Objects.equals(taskId, that.taskId) &&
+               Objects.equals(topicPartitions, that.topicPartitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(taskId, topicPartitions);
+    }
+
+    @Override
+    public String toString() {
+        return "TaskMetadata{" +
+                "taskId=" + taskId +
+                ", topicPartitions=" + topicPartitions +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
new file mode 100644
index 0000000..865d289
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ThreadMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public class ThreadMetadata {
+
+    private final String threadName;
+
+    private final String threadState;
+
+    private final Set<TaskMetadata> activeTasks;
+
+    private final Set<TaskMetadata> standbyTasks;
+
+    public ThreadMetadata(final String threadName,
+                          final String threadState,
+                          final Set<TaskMetadata> activeTasks,
+                          final Set<TaskMetadata> standbyTasks) {
+        this.threadName = threadName;
+        this.threadState = threadState;
+        this.activeTasks = Collections.unmodifiableSet(activeTasks);
+        this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
+    }
+
+    public String threadState() {
+        return threadState;
+    }
+
+    public String threadName() {
+        return threadName;
+    }
+
+    public Set<TaskMetadata> activeTasks() {
+        return activeTasks;
+    }
+
+    public Set<TaskMetadata> standbyTasks() {
+        return standbyTasks;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ThreadMetadata that = (ThreadMetadata) o;
+        return Objects.equals(threadName, that.threadName) &&
+               Objects.equals(threadState, that.threadState) &&
+               Objects.equals(activeTasks, that.activeTasks) &&
+               Objects.equals(standbyTasks, that.standbyTasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(threadName, threadState, activeTasks, standbyTasks);
+    }
+
+    @Override
+    public String toString() {
+        return "ThreadMetadata{" +
+                "threadName=" + threadName +
+                ", threadState=" + threadState +
+                ", activeTasks=" + activeTasks +
+                ", standbyTasks=" + standbyTasks +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d978f3d..818992e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -43,6 +43,8 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -196,6 +198,11 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
             }
 
             state = newState;
+            if (newState == State.RUNNING) {
+                updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
+            } else {
+                updateThreadMetadata(null, null);
+            }
         }
 
         if (stateListener != null) {
@@ -560,6 +567,8 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
 
     public final String applicationId;
 
+    private volatile ThreadMetadata threadMetadata;
+
     private final static int UNLIMITED_RECORDS = -1;
 
     public StreamThread(final InternalTopologyBuilder builder,
@@ -603,6 +612,7 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
         }
         this.consumer = clientSupplier.getConsumer(consumerConfigs);
         taskManager.setConsumer(consumer);
+        updateThreadMetadata(null, null);
     }
 
     @SuppressWarnings("ConstantConditions")
@@ -1171,4 +1181,29 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
     private void refreshMetadataState() {
         streamsMetadataState.onChange(metadataProvider.getPartitionsByHostState(), metadataProvider.clusterMetadata());
     }
+
+    /**
+     * Return information about the current {@link StreamThread}.
+     *
+     * @return {@link ThreadMetadata}.
+     */
+    public final ThreadMetadata threadMetadata() {
+        return threadMetadata;
+    }
+
+    private void updateThreadMetadata(final Map<TaskId, Task> activeTasks, final Map<TaskId,
Task> standbyTasks) {
+        final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
+        if (activeTasks != null) {
+            for (Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
+                activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
+            }
+        }
+        final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
+        if (standbyTasks != null) {
+            for (Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
+                standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
+            }
+        }
+        threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), activeTasksMetadata,
standbyTasksMetadata);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index f470b74..4ff6468 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -192,7 +192,7 @@ public class StreamsMetadataState {
     /**
      * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the
      * metadata
-     * @param currentState  the current mapping of {@link HostInfo} -> {@link TopicPartition}s
+     * @param currentState       the current mapping of {@link HostInfo} -> {@link TopicPartition}s
      * @param clusterMetadata    the current clusterMetadata {@link Cluster}
      */
     synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState,
final Cluster clusterMetadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 151ef35..2d896fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -239,6 +239,10 @@ class TaskManager {
         return active.runningTaskMap();
     }
 
+    Map<TaskId, Task> standbyTasks() {
+        return standby.runningTaskMap();
+    }
+
     void setConsumer(final Consumer<byte[], byte[]> consumer) {
         this.consumer = consumer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9e3ab2c..f465654 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -23,12 +23,14 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -45,6 +47,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +56,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
 
 @Category({IntegrationTest.class})
 public class KafkaStreamsTest {
@@ -403,6 +407,26 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldReturnThreadMetadata() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
+        props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.start();
+        Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+        assertNotNull(threadMetadata);
+        assertEquals(2, threadMetadata.size());
+        for (ThreadMetadata metadata : threadMetadata) {
+            assertTrue(Utils.mkList("RUNNING", "CREATED").contains(metadata.threadState()));
+            assertEquals(0, metadata.standbyTasks().size());
+            assertEquals(0, metadata.activeTasks().size());
+        }
+        streams.close();
+    }
+
 
     private KafkaStreams createKafkaStreams() {
         final Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/facd2c5a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a6e8b1f..51e6568 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,6 +37,8 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -534,7 +536,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(props);
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = mockTaskMangerCommit(consumer, 1, 1);
+        final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
         final StreamThread thread = new StreamThread(internalTopologyBuilder,
@@ -566,7 +568,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(props);
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = mockTaskMangerCommit(consumer, 1, 0);
+        final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
         final StreamThread thread = new StreamThread(internalTopologyBuilder,
@@ -587,7 +589,7 @@ public class StreamThreadTest {
 
         EasyMock.verify(taskManager);
     }
-    
+
 
     @SuppressWarnings("unchecked")
     @Test
@@ -599,7 +601,7 @@ public class StreamThreadTest {
 
         final StreamsConfig config = new StreamsConfig(props);
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = mockTaskMangerCommit(consumer, 2, 1);
+        final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
         StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics,
"", "", Collections.<String, String>emptyMap());
         final StreamThread thread = new StreamThread(internalTopologyBuilder,
@@ -622,7 +624,7 @@ public class StreamThreadTest {
     }
 
     @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
-    private TaskManager mockTaskMangerCommit(final Consumer<byte[], byte[]> consumer,
final int numberOfCommits, final int commits) {
+    private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer,
final int numberOfCommits, final int commits) {
         final TaskManager taskManager = EasyMock.createMock(TaskManager.class);
         taskManager.setConsumer(EasyMock.anyObject(Consumer.class));
         EasyMock.expectLastCall();
@@ -1008,4 +1010,155 @@ public class StreamThreadTest {
     private StreamThread getStreamThread() {
         return createStreamThread(clientId, config, false);
     }
+
+
+    @Test
+    public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException
{
+        internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
+
+        final TaskId taskId = new TaskId(0, 0);
+        final StreamThread thread = createStreamThread(clientId, config, false);
+
+        final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(taskId, task0Assignment);
+        thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(assignment));
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.runOnce(-1);
+
+        ThreadMetadata threadMetadata = thread.threadMetadata();
+        assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
+        assertTrue(threadMetadata.activeTasks().contains(new TaskMetadata(taskId.toString(),
task0Assignment)));
+        assertTrue(threadMetadata.standbyTasks().isEmpty());
+    }
+
+    @Test
+    public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException
{
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
+                Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+                        0,
+                        null,
+                        new Node[0],
+                        new Node[0])));
+
+        final HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
+        restoreConsumer.updateEndOffsets(offsets);
+        restoreConsumer.updateBeginningOffsets(offsets);
+
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        final TopicPartition t1 = new TopicPartition("t1", 0);
+        Set<TopicPartition> partitionsT1 = Utils.mkSet(t1);
+        standbyTasks.put(taskId, partitionsT1);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+        thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(activeTasks, standbyTasks));
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(task0Assignment);
+        thread.rebalanceListener.onPartitionsAssigned(null);
+        thread.runOnce(-1);
+
+        ThreadMetadata threadMetadata = thread.threadMetadata();
+        assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
+        assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(taskId.toString(),
partitionsT1)));
+        assertTrue(threadMetadata.activeTasks().isEmpty());
+    }
+
+    @Test
+    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException
{
+        final StreamThread thread = createStreamThread(clientId, config, false);
+        ThreadMetadata metadata = thread.threadMetadata();
+        assertEquals(StreamThread.State.CREATED.name(), metadata.threadState());
+
+        thread.setState(StreamThread.State.RUNNING);
+        metadata = thread.threadMetadata();
+        assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
+    }
+
+    @Test
+    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning()
throws InterruptedException {
+        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+
+        final StreamThread thread = createStreamThread(clientId, config, false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
+                Utils.mkList(
+                        new PartitionInfo("stream-thread-test-count-one-changelog",
+                                0,
+                                null,
+                                new Node[0],
+                                new Node[0]),
+                        new PartitionInfo("stream-thread-test-count-one-changelog",
+                                1,
+                                null,
+                                new Node[0],
+                                new Node[0])
+                        ));
+        final HashMap<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
+        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L);
+        restoreConsumer.updateEndOffsets(offsets);
+        restoreConsumer.updateBeginningOffsets(offsets);
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        final TopicPartition t1p0 = new TopicPartition("t1", 0);
+        Set<TopicPartition> partitionsT1P0 = Utils.mkSet(t1p0);
+        standbyTasks.put(new TaskId(0, 0), partitionsT1P0);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        final TopicPartition t1p1 = new TopicPartition("t1", 1);
+        Set<TopicPartition> partitionsT1P1 = Utils.mkSet(t1p1);
+        activeTasks.put(new TaskId(0, 1), partitionsT1P1);
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+        thread.setThreadMetadataProvider(new StreamPartitionAssignor() {
+            @Override
+            public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+                return standbyTasks;
+            }
+
+            @Override
+            public Map<TaskId, Set<TopicPartition>> activeTasks() {
+                return activeTasks;
+            }
+        });
+
+        thread.setState(StreamThread.State.RUNNING);
+
+        thread.rebalanceListener.onPartitionsRevoked(partitionsT1P0);
+        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
+
+        clientSupplier.consumer.assign(partitionsT1P1);
+        thread.rebalanceListener.onPartitionsAssigned(partitionsT1P1);
+        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED);
+        thread.runOnce(-1);
+
+        standbyTasks.clear();
+        activeTasks.clear();
+        standbyTasks.put(new TaskId(0, 1), Utils.mkSet(t1p1));
+        activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1p0));
+
+        assertFalse(thread.threadMetadata().activeTasks().isEmpty());
+        assertFalse(thread.threadMetadata().standbyTasks().isEmpty());
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED);
+    }
+
+    private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State
state) {
+        assertEquals(state.name(), metadata.threadState());
+        assertTrue(metadata.activeTasks().isEmpty());
+        assertTrue(metadata.standbyTasks().isEmpty());
+    }
 }


Mime
View raw message