kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: log4j improvements on assigned tasks and store changelog reader
Date Fri, 06 Oct 2017 22:42:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 23a014052 -> 2427a4476


MINOR: log4j improvements on assigned tasks and store changelog reader

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Xavier Léauté <xavier@confluent.io>,
Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>

Closes #4031 from guozhangwang/KMinor-assigned-task-log4j


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2427a447
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2427a447
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2427a447

Branch: refs/heads/trunk
Commit: 2427a44768f9734179c957b645df0476e7cb6d05
Parents: 23a0140
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Oct 6 15:42:18 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Oct 6 15:42:18 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/AssignedTasks.java     | 8 ++++----
 .../streams/processor/internals/StoreChangelogReader.java    | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 12c3f79..6ab807f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -122,7 +122,7 @@ class AssignedTasks implements RestoringTasks {
             final Map.Entry<TaskId, Task> entry = it.next();
             try {
                 if (!entry.getValue().initialize()) {
-                    log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
+                    log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue(), readyPartitions);
@@ -140,7 +140,7 @@ class AssignedTasks implements RestoringTasks {
         if (restored.isEmpty()) {
             return Collections.emptySet();
         }
-        log.trace("{} partitions restored for {}", taskTypeName, restored);
+        log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName,
restored);
         final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator();
it.hasNext(); ) {
@@ -153,10 +153,10 @@ class AssignedTasks implements RestoringTasks {
                 if (log.isTraceEnabled()) {
                     final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
                     outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("partition restoration not complete for {} {} partitions: {}",
+                    log.trace("{} {} cannot resume processing yet since some of its changelog
partitions have not completed restoring: {}",
                               taskTypeName,
                               task.id(),
-                              task.changelogPartitions());
+                              outstandingPartitions);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2427a447/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index cc298e2..bbe570c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -194,7 +194,7 @@ public class StoreChangelogReader implements ChangelogReader {
     private Collection<TopicPartition> completed() {
         final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
         completed.removeAll(needsRestoring.keySet());
-        log.debug("completed partitions {}", completed);
+        log.trace("The set of restoration completed partitions so far: {}", completed);
         return completed;
     }
 


Mime
View raw message