kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4008: Module "tools" should not be dependent on "core"
Date Tue, 02 Aug 2016 03:12:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2a9f7af6c -> f7976d2fc


KAFKA-4008: Module "tools" should not be dependent on "core"

moved streams application reset tool from tools to core

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Damian Guy <damian.guy@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1685 from mjsax/moveResetTool

(cherry picked from commit f2405a73ea2dd4b636832b7f8729fb06a04de1d5)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7976d2f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7976d2f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7976d2f

Branch: refs/heads/trunk
Commit: f7976d2fc1793d0f635b42eb4dca3810e40c4cc8
Parents: 2a9f7af
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Aug 1 20:12:22 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Aug 1 20:12:38 2016 -0700

----------------------------------------------------------------------
 bin/kafka-streams-application-reset.sh          |   2 +-
 build.gradle                                    |   2 -
 checkstyle/import-control-core.xml              |   2 +
 checkstyle/import-control.xml                   |   4 +-
 .../main/scala/kafka/tools/StreamsResetter.java | 268 +++++++++++++++++++
 .../integration/ResetIntegrationTest.java       |   2 +-
 .../org/apache/kafka/tools/StreamsResetter.java | 260 ------------------
 7 files changed, 273 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/bin/kafka-streams-application-reset.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh
index 26ab766..3363732 100755
--- a/bin/kafka-streams-application-reset.sh
+++ b/bin/kafka-streams-application-reset.sh
@@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
 
-exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@"
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.StreamsResetter "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 67ddfdb..ead2520 100644
--- a/build.gradle
+++ b/build.gradle
@@ -641,7 +641,6 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
-    compile project(':core')
     compile project(':clients')
     compile project(':log4j-appender')
     compile libs.argparse4j
@@ -690,7 +689,6 @@ project(':streams') {
     testCompile project(':clients').sourceSets.test.output
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
-    testCompile project(':tools')
     testCompile libs.junit
 
     testRuntime libs.slf4jlog4j

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index d53e9e8..5714bfd 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -53,10 +53,12 @@
   </subpackage>
 
   <subpackage name="tools">
+    <allow pkg="kafka.admin" />
     <allow pkg="kafka.javaapi" />
     <allow pkg="kafka.producer" />
     <allow pkg="kafka.consumer" />
     <allow pkg="joptsimple" />
+    <allow pkg="org.apache.kafka.clients.consumer" />
   </subpackage>
 
   <subpackage name="examples">

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1052d8e..632b516 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,8 +123,6 @@
     <allow pkg="com.fasterxml.jackson" />
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
-    <allow pkg="joptsimple" />
-    <allow pkg="kafka" />
   </subpackage>
 
   <subpackage name="streams">
@@ -144,6 +142,7 @@
     <subpackage name="integration">
       <allow pkg="kafka.admin" />
       <allow pkg="kafka.server" />
+      <allow pkg="kafka.tools" />
       <allow pkg="kafka.utils" />
       <allow pkg="kafka.zk" />
       <allow pkg="kafka.log" />
@@ -151,7 +150,6 @@
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
       <allow pkg="org.hamcrest" />
-      <allow pkg="org.apache.kafka.tools" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
new file mode 100644
index 0000000..8e463d1
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import kafka.admin.TopicCommand;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * {@link StreamsResetter} resets the processing state of a Kafka Streams application so
that, for example, you can reprocess its input from scratch.
+ * <p>
+ * <strong>This class is not part of public API. For backward compatibility, use the
provided script in "bin/" instead of calling this class directly from your code.</strong>
+ * <p>
+ * Resetting the processing state of an application includes the following actions:
+ * <ol>
+ * <li>setting the application's consumer offsets for input and internal topics to
zero</li>
+ * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate
topics)</li>
+ * <li>deleting any topics created internally by Kafka Streams for this application</li>
+ * </ol>
+ * <p>
+ * Do only use this tool if <strong>no</strong> application instance is running.
Otherwise, the application will get into an invalid state and crash or produce wrong results.
+ * <p>
+ * If you run multiple application instances, running this tool once is sufficient.
+ * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance
(to clean local state store directory).
+ * Otherwise, your application is in an invalid state.
+ * <p>
+ * User output topics will not be deleted or modified by this tool.
+ * If downstream applications consume intermediate or output topics, it is the user's responsibility
to adjust those applications manually if required.
+ */
+@InterfaceStability.Unstable
+public class StreamsResetter {
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_ERROR = 1;
+
+    private static OptionSpec<String> bootstrapServerOption;
+    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpec<String> applicationIdOption;
+    private static OptionSpec<String> inputTopicsOption;
+    private static OptionSpec<String> intermediateTopicsOption;
+
+    private OptionSet options = null;
+    private final Properties consumerConfig = new Properties();
+    private final List<String> allTopics = new LinkedList<>();
+
+    public int run(final String[] args) {
+        return run(args, new Properties());
+    }
+
+    public int run(final String[] args, final Properties config) {
+        this.consumerConfig.clear();
+        this.consumerConfig.putAll(config);
+
+        int exitCode = EXIT_CODE_SUCCESS;
+
+        ZkUtils zkUtils = null;
+        try {
+            parseArguments(args);
+
+            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            this.allTopics.clear();
+            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+
+            resetInputAndInternalTopicOffsets();
+            seekToEndIntermediateTopics();
+            deleteInternalTopics(zkUtils);
+        } catch (final Exception e) {
+            exitCode = EXIT_CODE_ERROR;
+            System.err.println("ERROR: " + e.getMessage());
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+
+        return exitCode;
+    }
+
+    private void parseArguments(final String[] args) throws IOException {
+        final OptionParser optionParser = new OptionParser();
+        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application
ID (application.id)")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("id")
+            .required();
+        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated
list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:9092")
+            .describedAs("urls");
+        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:2181")
+            .describedAs("url");
+        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of
user input topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated
list of intermediate user topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+
+        try {
+            this.options = optionParser.parse(args);
+        } catch (final OptionException e) {
+            optionParser.printHelpOn(System.err);
+            throw e;
+        }
+    }
+
+    private void resetInputAndInternalTopicOffsets() {
+        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+
+        if (inputTopics.size() == 0) {
+            System.out.println("No input topics specified.");
+        } else {
+            System.out.println("Resetting offsets to zero for input topics " + inputTopics
+ " and all internal topics.");
+        }
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String inTopic : inputTopics) {
+            if (!this.allTopics.contains(inTopic)) {
+                System.out.println("Input topic " + inTopic + " not found. Skipping.");
+            }
+        }
+
+        for (final String topic : this.allTopics) {
+            if (isInputTopic(topic) || isInternalTopic(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config,
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToBeginning(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Resetting offsets for topic " + topic + "
failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInputTopic(final String topic) {
+        return this.options.valuesOf(inputTopicsOption).contains(topic);
+    }
+
+    private void seekToEndIntermediateTopics() {
+        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
+
+        if (intermediateTopics.size() == 0) {
+            System.out.println("No intermediate user topics specified, skipping seek-to-end
for user topic offsets.");
+            return;
+        }
+
+        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String topic : intermediateTopics) {
+            if (this.allTopics.contains(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config,
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToEnd(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
+                    throw e;
+                }
+            } else {
+                System.out.println("Topic " + topic + " not found. Skipping.");
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private void deleteInternalTopics(final ZkUtils zkUtils) {
+        System.out.println("Deleting all internal/auto-created topics for application " +
this.options.valueOf(applicationIdOption));
+
+        for (final String topic : this.allTopics) {
+            if (isInternalTopic(topic)) {
+                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new
String[]{
+                    "--zookeeper", this.options.valueOf(zookeeperOption),
+                    "--delete", "--topic", topic});
+                try {
+                    TopicCommand.deleteTopic(zkUtils, commandOptions);
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInternalTopic(final String topicName) {
+        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+            && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+    }
+
+    public static void main(final String[] args) {
+        System.exit(new StreamsResetter().run(args));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 85aff26..4d13b30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.tools.StreamsResetter;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -37,7 +38,6 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tools.StreamsResetter;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7976d2f/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
deleted file mode 100644
index 734c15b..0000000
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
- * License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.tools;
-
-import joptsimple.OptionException;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import kafka.admin.TopicCommand;
-import kafka.utils.ZkUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * {@link StreamsResetter} resets the processing state of a Kafka Streams application so
that, for example, you can reprocess its input from scratch.
- * <p>
- * Resetting the processing state of an application includes the following actions:
- * <ol>
- * <li>setting the application's consumer offsets for input and internal topics to
zero</li>
- * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate
topics)</li>
- * <li>deleting any topics created internally by Kafka Streams for this application</li>
- * </ol>
- * <p>
- * Do only use this tool if <strong>no</strong> application instance is running.
Otherwise, the application will get into an invalid state and crash or produce wrong results.
- * <p>
- * If you run multiple application instances, running this tool once is sufficient.
- * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance
(to clean local state store directory).
- * Otherwise, your application is in an invalid state.
- * <p>
- * User output topics will not be deleted or modified by this tool.
- * If downstream applications consume intermediate or output topics, it is the user's responsibility
to adjust those applications manually if required.
- */
-public class StreamsResetter {
-    private static final int EXIT_CODE_SUCCESS = 0;
-    private static final int EXIT_CODE_ERROR = 1;
-
-    private static OptionSpec<String> bootstrapServerOption;
-    private static OptionSpec<String> zookeeperOption;
-    private static OptionSpec<String> applicationIdOption;
-    private static OptionSpec<String> inputTopicsOption;
-    private static OptionSpec<String> intermediateTopicsOption;
-
-    private OptionSet options = null;
-    private final Properties consumerConfig = new Properties();
-    private final List<String> allTopics = new LinkedList<>();
-
-    public int run(final String[] args) {
-        return run(args, new Properties());
-    }
-
-    public int run(final String[] args, final Properties config) {
-        this.consumerConfig.clear();
-        this.consumerConfig.putAll(config);
-
-        int exitCode = EXIT_CODE_SUCCESS;
-
-        ZkUtils zkUtils = null;
-        try {
-            parseArguments(args);
-
-            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
-
-            this.allTopics.clear();
-            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-
-            resetInputAndInternalTopicOffsets();
-            seekToEndIntermediateTopics();
-            deleteInternalTopics(zkUtils);
-        } catch (final Exception e) {
-            exitCode = EXIT_CODE_ERROR;
-            System.err.println("ERROR: " + e.getMessage());
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
-        }
-
-        return exitCode;
-    }
-
-    private void parseArguments(final String[] args) throws IOException {
-        final OptionParser optionParser = new OptionParser();
-        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application
ID (application.id)")
-            .withRequiredArg()
-            .ofType(String.class)
-            .describedAs("id")
-            .required();
-        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated
list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
-            .withRequiredArg()
-            .ofType(String.class)
-            .defaultsTo("localhost:9092")
-            .describedAs("urls");
-        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
-            .withRequiredArg()
-            .ofType(String.class)
-            .defaultsTo("localhost:2181")
-            .describedAs("url");
-        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of
user input topics")
-            .withRequiredArg()
-            .ofType(String.class)
-            .withValuesSeparatedBy(',')
-            .describedAs("list");
-        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated
list of intermediate user topics")
-            .withRequiredArg()
-            .ofType(String.class)
-            .withValuesSeparatedBy(',')
-            .describedAs("list");
-
-        try {
-            this.options = optionParser.parse(args);
-        } catch (final OptionException e) {
-            optionParser.printHelpOn(System.err);
-            throw e;
-        }
-    }
-
-    private void resetInputAndInternalTopicOffsets() {
-        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
-
-        if (inputTopics.size() == 0) {
-            System.out.println("No input topics specified.");
-        } else {
-            System.out.println("Resetting offsets to zero for input topics " + inputTopics
+ " and all internal topics.");
-        }
-
-        final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
-        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        for (final String inTopic : inputTopics) {
-            if (!this.allTopics.contains(inTopic)) {
-                System.out.println("Input topic " + inTopic + " not found. Skipping.");
-            }
-        }
-
-        for (final String topic : this.allTopics) {
-            if (isInputTopic(topic) || isInternalTopic(topic)) {
-                System.out.println("Topic: " + topic);
-
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config,
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
-
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToBeginning(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Resetting offsets for topic " + topic + "
failed.");
-                    throw e;
-                }
-            }
-        }
-
-        System.out.println("Done.");
-    }
-
-    private boolean isInputTopic(final String topic) {
-        return this.options.valuesOf(inputTopicsOption).contains(topic);
-    }
-
-    private void seekToEndIntermediateTopics() {
-        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
-
-        if (intermediateTopics.size() == 0) {
-            System.out.println("No intermediate user topics specified, skipping seek-to-end
for user topic offsets.");
-            return;
-        }
-
-        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
-
-        final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
-        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        for (final String topic : intermediateTopics) {
-            if (this.allTopics.contains(topic)) {
-                System.out.println("Topic: " + topic);
-
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config,
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
-
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToEnd(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
-                    throw e;
-                }
-            } else {
-                System.out.println("Topic " + topic + " not found. Skipping.");
-            }
-        }
-
-        System.out.println("Done.");
-    }
-
-    private void deleteInternalTopics(final ZkUtils zkUtils) {
-        System.out.println("Deleting all internal/auto-created topics for application " +
this.options.valueOf(applicationIdOption));
-
-        for (final String topic : this.allTopics) {
-            if (isInternalTopic(topic)) {
-                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new
String[]{
-                    "--zookeeper", this.options.valueOf(zookeeperOption),
-                    "--delete", "--topic", topic});
-                try {
-                    TopicCommand.deleteTopic(zkUtils, commandOptions);
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
-                    throw e;
-                }
-            }
-        }
-
-        System.out.println("Done.");
-    }
-
-    private boolean isInternalTopic(final String topicName) {
-        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
-            && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
-    }
-
-    public static void main(final String[] args) {
-        System.exit(new StreamsResetter().run(args));
-    }
-
-}


Mime
View raw message