kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Fix restoring for source KTable
Date Thu, 04 Feb 2016 04:42:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5b5869383 -> d3ff902d6


MINOR: Fix restoring for source KTable

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #860 from guozhangwang/KRestoreChangelog


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3ff902d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3ff902d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3ff902d

Branch: refs/heads/trunk
Commit: d3ff902d6023eff257653a5dfb31f4e482204c44
Parents: 5b58693
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Feb 3 20:42:43 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 3 20:42:43 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/TopologyBuilder.java    | 2 ++
 .../streams/processor/internals/ProcessorStateManager.java     | 6 +++---
 .../streams/processor/internals/StreamPartitionAssignor.java   | 6 +++---
 .../kafka/streams/processor/internals/StandbyTaskTest.java     | 4 ++--
 .../org/apache/kafka/test/ProcessorTopologyTestDriver.java     | 2 +-
 5 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7af377f..785d3e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -495,6 +495,8 @@ public class TopologyBuilder {
                 // if the node is connected to a state, add to the state topics
                 for (StateStoreFactory stateFactory : stateFactories.values()) {
 
+                    // we store the changelog topic here without the job id prefix
+                    // since it is within a single job and is only used for
                     if (stateFactory.isInternal && stateFactory.users.contains(node))
{
                         stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index b90af48..c3bd82a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -168,18 +168,18 @@ public class ProcessorStateManager {
             if (store.persistent())
                 restoreCallbacks.put(topic, stateRestoreCallback);
         } else {
-            restoreActiveState(store, stateRestoreCallback);
+            restoreActiveState(topic, stateRestoreCallback);
         }
     }
 
-    private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback)
{
+    private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback)
{
         // ---- try to restore the state from change-log ---- //
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should have not subscribed
to any partitions beforehand");
         }
-        TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId,
store.name()), getPartition(store.name()));
+        TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
         restoreConsumer.assign(Collections.singletonList(storePartition));
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index d499534..74770a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -300,11 +300,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         stateChangelogTopicToTaskIds = new HashMap<>();
         internalSourceTopicToTaskIds = new HashMap<>();
         for (TaskId task : partitionsForTask.keySet()) {
-            for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics)
{
-                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
+            for (String topicName : topicGroups.get(task.topicGroupId).stateChangelogTopics)
{
+                Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topicName);
                 if (tasks == null) {
                     tasks = new HashSet<>();
-                    stateChangelogTopicToTaskIds.put(stateName, tasks);
+                    stateChangelogTopicToTaskIds.put(topicName, tasks);
                 }
 
                 tasks.add(task);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ffcf9ae..fd6f49f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -59,8 +59,8 @@ public class StandbyTaskTest {
     private final String jobId = "test-job";
     private final String storeName1 = "store1";
     private final String storeName2 = "store2";
-    private final String storeChangelogTopicName1 = jobId + "-" + storeName1 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
-    private final String storeChangelogTopicName2 = jobId + "-" + storeName2 + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId,
storeName1);
+    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId,
storeName2);
 
     private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1,
1);
     private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2,
1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3ff902d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index af6d51b..5edff28 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -333,7 +333,7 @@ public class ProcessorTopologyTestDriver {
         };
         // For each store name ...
         for (String storeName : storeNames) {
-            String topicName = jobId + "-" + storeName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+            String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName);
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager
expects) ...


Mime
View raw message