kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: improve Streams application reset tool to make sure application is down
Date Sat, 20 Aug 2016 19:07:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 afb65688a -> 45259d11a


MINOR: improve Streams application reset tool to make sure application is down

guozhangwang miguno dguy enothereska hjafarpour
See #1764

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1765 from mjsax/improveResetTool-0.10.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45259d11
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45259d11
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45259d11

Branch: refs/heads/0.10.0
Commit: 45259d11aa77bda708a2e9422c3dc3899668965d
Parents: afb6568
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Sat Aug 20 12:06:58 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Aug 20 12:06:58 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/StreamsResetter.java | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45259d11/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 8e463d1..8d9cd5e 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -20,6 +20,7 @@ import joptsimple.OptionException;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import kafka.admin.AdminClient;
 import kafka.admin.TopicCommand;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -82,10 +83,18 @@ public class StreamsResetter {
 
         int exitCode = EXIT_CODE_SUCCESS;
 
+        AdminClient adminClient = null;
         ZkUtils zkUtils = null;
         try {
             parseArguments(args);
 
+            adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
+            final String groupId = this.options.valueOf(applicationIdOption);
+            if (adminClient.describeConsumerGroup(groupId).size() != 0) {
+                throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. " +
+                    "Make sure to stop all running application instances before running the
reset tool.");
+            }
+
             zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
                 30000,
                 30000,
@@ -97,10 +106,13 @@ public class StreamsResetter {
             resetInputAndInternalTopicOffsets();
             seekToEndIntermediateTopics();
             deleteInternalTopics(zkUtils);
-        } catch (final Exception e) {
+        } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e.getMessage());
         } finally {
+            if (adminClient != null) {
+                adminClient.close();
+            }
             if (zkUtils != null) {
                 zkUtils.close();
             }


Mime
View raw message