kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: improve Streams application reset tool to make sure application is down
Date Mon, 22 Aug 2016 22:45:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f153407c4 -> d851ce76f


MINOR: improve Streams application reset tool to make sure application is down

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1764 from mjsax/improveResetTool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d851ce76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d851ce76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d851ce76

Branch: refs/heads/trunk
Commit: d851ce76fdaa469dcaed7f66340574529d79a3cd
Parents: f153407
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Aug 22 15:45:29 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Aug 22 15:45:29 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/StreamsResetter.java | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d851ce76/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 8e463d1..22b8bd6 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -20,6 +20,7 @@ import joptsimple.OptionException;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import kafka.admin.AdminClient;
 import kafka.admin.TopicCommand;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -82,10 +83,18 @@ public class StreamsResetter {
 
         int exitCode = EXIT_CODE_SUCCESS;
 
+        AdminClient adminClient = null;
         ZkUtils zkUtils = null;
         try {
             parseArguments(args);
 
+            adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
+            final String groupId = this.options.valueOf(applicationIdOption);
+            if (adminClient.describeConsumerGroup(groupId).get().size() != 0) {
+                throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. " +
+                    "Make sure to stop all running application instances before running the
reset tool.");
+            }
+
             zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
                 30000,
                 30000,
@@ -97,10 +106,13 @@ public class StreamsResetter {
             resetInputAndInternalTopicOffsets();
             seekToEndIntermediateTopics();
             deleteInternalTopics(zkUtils);
-        } catch (final Exception e) {
+        } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e.getMessage());
         } finally {
+            if (adminClient != null) {
+                adminClient.close();
+            }
             if (zkUtils != null) {
                 zkUtils.close();
             }


Mime
View raw message