kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5525: Streams reset tool should have same console output with or without dry-run
Date Wed, 05 Jul 2017 22:05:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6cae5ec66 -> 3ab0456db


KAFKA-5525: Streams reset tool should have same console output with or without dry-run

Fixed console output to be consistent with/without dry-run option

Author: ppatierno <ppatierno@live.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3443 from ppatierno/kafka-5525


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3ab0456d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3ab0456d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3ab0456d

Branch: refs/heads/trunk
Commit: 3ab0456db839d8959b2846b9a7e4cc28b4c822c7
Parents: 6cae5ec
Author: Paolo Patierno <ppatierno@live.com>
Authored: Wed Jul 5 15:05:05 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 5 15:05:05 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 41 +++++++++-----------
 1 file changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3ab0456d/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index d2c5e14..55989f4 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -189,13 +189,11 @@ public class StreamsResetter {
             return;
         }
 
-        if (!dryRun) {
-            if (inputTopics.size() != 0) {
-                System.out.println("Seek-to-beginning for input topics " + inputTopics);
-            }
-            if (intermediateTopics.size() != 0) {
-                System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
-            }
+        if (inputTopics.size() != 0) {
+            System.out.println("Seek-to-beginning for input topics " + inputTopics);
+        }
+        if (intermediateTopics.size() != 0) {
+            System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
         }
 
         final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size()
+ intermediateTopics.size());
@@ -278,18 +276,16 @@ public class StreamsResetter {
         final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
         if (intermediateTopicPartitions.size() > 0) {
+            System.out.println("Following intermediate topics offsets will be reset to end
(for consumer group " + groupId + ")");
+            for (final String topic : intermediateTopics) {
+                if (allTopics.contains(topic)) {
+                    System.out.println("Topic: " + topic);
+                }
+            }
             if (!dryRun) {
                 client.seekToEnd(intermediateTopicPartitions);
-            } else {
-                System.out.println("Following intermediate topics offsets will be reset to
end (for consumer group " + groupId + ")");
-                for (final String topic : intermediateTopics) {
-                    if (allTopics.contains(topic)) {
-                        System.out.println("Topic: " + topic);
-                    }
-                }
             }
         }
-
     }
 
     private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
@@ -299,15 +295,14 @@ public class StreamsResetter {
         final String groupId = options.valueOf(applicationIdOption);
 
         if (inputTopicPartitions.size() > 0) {
+            System.out.println("Following input topics offsets will be reset to beginning
(for consumer group " + groupId + ")");
+            for (final String topic : inputTopics) {
+                if (allTopics.contains(topic)) {
+                    System.out.println("Topic: " + topic);
+                }
+            }
             if (!dryRun) {
                 client.seekToBeginning(inputTopicPartitions);
-            } else {
-                System.out.println("Following input topics offsets will be reset to beginning
(for consumer group " + groupId + ")");
-                for (final String topic : inputTopics) {
-                    if (allTopics.contains(topic)) {
-                        System.out.println("Topic: " + topic);
-                    }
-                }
             }
         }
     }
@@ -350,7 +345,7 @@ public class StreamsResetter {
     }
 
     private void printHelp(OptionParser parser) throws IOException {
-        System.err.println("The Application Reset Tool allows you to quickly reset an application
in order to reprocess "
+        System.err.println("The Streams Reset Tool allows you to quickly reset an application
in order to reprocess "
                 + "its data from scratch.\n"
                 + "* This tool resets offsets of input topics to the earliest available offset
and it skips to the end of "
                 + "intermediate topics (topics used in the through() method).\n"


Mime
View raw message