kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: do not create a StandbyTask if there is no state store in the task
Date Mon, 16 Nov 2015 22:03:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 45e7f7130 -> 4a3d244a2


MINOR: do not create a StandbyTask if there is no state store in the task

guozhangwang
An optimization which may reduce unnecessary poll for standby tasks.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #535 from ymatsuda/remove_empty_standby_task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a3d244a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a3d244a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a3d244a

Branch: refs/heads/trunk
Commit: 4a3d244a2cb95654d18368de6e5b67661d4f4f10
Parents: 45e7f71
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Mon Nov 16 14:09:27 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 16 14:09:27 2015 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/StreamThread.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a3d244a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index bbaeb14..796e53f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -542,7 +542,11 @@ public class StreamThread extends Thread {
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+        if (!topology.stateStoreSuppliers().isEmpty()) {
+            return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+        } else {
+            return null;
+        }
     }
 
     private void addStandbyTasks() {
@@ -550,8 +554,10 @@ public class StreamThread extends Thread {
 
         for (TaskId taskId : partitionGrouper.standbyTasks()) {
             StandbyTask task = createStandbyTask(taskId);
-            standbyTasks.put(taskId, task);
-            checkpointedOffsets.putAll(task.checkpointedOffsets());
+            if (task != null) {
+                standbyTasks.put(taskId, task);
+                checkpointedOffsets.putAll(task.checkpointedOffsets());
+            }
         }
 
         restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));


Mime
View raw message