kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3185: Allow users to cleanup internal Kafka Streams data
Date Wed, 27 Jul 2016 21:11:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d58c2d9ac -> 8deedcacb


KAFKA-3185: Allow users to cleanup internal Kafka Streams data

- added Kafka Stream Application Reset Tool

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang, Michael G. Noll

Closes #1636 from mjsax/kafka-3185


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8deedcac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8deedcac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8deedcac

Branch: refs/heads/trunk
Commit: 8deedcacb68ff8ebae802e086ff706ad7e5f38e9
Parents: d58c2d9
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Jul 27 14:11:40 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 27 14:11:40 2016 -0700

----------------------------------------------------------------------
 bin/kafka-streams-application-reset.sh          |  21 ++
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |   3 +
 .../org/apache/kafka/streams/KafkaStreams.java  |  96 ++++---
 .../apache/kafka/streams/KafkaStreamsTest.java  | 129 +++++++--
 .../integration/ResetIntegrationTest.java       | 255 ++++++++++++++++++
 .../utils/EmbeddedSingleNodeKafkaCluster.java   |   3 +-
 .../org/apache/kafka/tools/StreamsResetter.java | 260 +++++++++++++++++++
 8 files changed, 709 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/bin/kafka-streams-application-reset.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh
new file mode 100755
index 0000000..26ab766
--- /dev/null
+++ b/bin/kafka-streams-application-reset.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ead2520..67ddfdb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -641,6 +641,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+    compile project(':core')
     compile project(':clients')
     compile project(':log4j-appender')
     compile libs.argparse4j
@@ -689,6 +690,7 @@ project(':streams') {
     testCompile project(':clients').sourceSets.test.output
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
+    testCompile project(':tools')
     testCompile libs.junit
 
     testRuntime libs.slf4jlog4j

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9a099d0..1052d8e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,6 +123,8 @@
     <allow pkg="com.fasterxml.jackson" />
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
+    <allow pkg="joptsimple" />
+    <allow pkg="kafka" />
   </subpackage>
 
   <subpackage name="streams">
@@ -149,6 +151,7 @@
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
       <allow pkg="org.hamcrest" />
+      <allow pkg="org.apache.kafka.tools" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 8f8cfa7..b9553c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -49,7 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of
  * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder}
  * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation.
- *
+ * <p>
  * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
  * more threads specified in the configs for the processing work.
  * <p>
@@ -62,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
  * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
  * <p>
- *
+ * <p>
  * A simple example might look like this:
  * <pre>
  *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
@@ -78,7 +80,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  *    KafkaStreams streams = new KafkaStreams(builder, config);
  *    streams.start();
  * </pre>
- *
  */
 
 @InterfaceStability.Unstable
@@ -104,42 +105,46 @@ public class KafkaStreams {
     // usage only and should not be exposed to users at all.
     private final UUID processId;
 
+    private final StreamsConfig config;
+
     /**
      * Construct the stream instance.
      *
-     * @param builder  the processor topology builder specifying the computational logic
-     * @param props    properties for the {@link StreamsConfig}
+     * @param builder the processor topology builder specifying the computational logic
+     * @param props   properties for the {@link StreamsConfig}
      */
-    public KafkaStreams(TopologyBuilder builder, Properties props) {
+    public KafkaStreams(final TopologyBuilder builder, final Properties props) {
         this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
      * Construct the stream instance.
      *
-     * @param builder  the processor topology builder specifying the computational logic
-     * @param config   the stream configs
+     * @param builder the processor topology builder specifying the computational logic
+     * @param config  the stream configs
      */
-    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) {
         this(builder, config, new DefaultKafkaClientSupplier());
     }
 
     /**
      * Construct the stream instance.
      *
-     * @param builder         the processor topology builder specifying the computational logic
-     * @param config          the stream configs
-     * @param clientSupplier  the kafka clients supplier which provides underlying producer and consumer clients
-     * for this {@link KafkaStreams} instance
+     * @param builder        the processor topology builder specifying the computational logic
+     * @param config         the stream configs
+     * @param clientSupplier the kafka clients supplier which provides underlying producer and consumer clients
+     *                       for this {@link KafkaStreams} instance
      */
-    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) {
+    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) {
         // create the metrics
-        Time time = new SystemTime();
+        final Time time = new SystemTime();
 
         this.processId = UUID.randomUUID();
 
+        this.config = config;
+
         // The application ID is a required config and hence should always have value
-        String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
         builder.setApplicationId(applicationId);
 
@@ -147,11 +152,11 @@ public class KafkaStreams {
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
 
-        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
 
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                 TimeUnit.MILLISECONDS);
 
@@ -160,7 +165,7 @@ public class KafkaStreams {
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time);
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
 
@@ -169,19 +174,20 @@ public class KafkaStreams {
 
     /**
      * Start the stream instance by starting all its threads.
+     *
      * @throws IllegalStateException if process was already started
      */
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
 
-        if (state == CREATED) {
-            for (StreamThread thread : threads)
+        if (this.state == CREATED) {
+            for (final StreamThread thread : this.threads)
                 thread.start();
 
-            state = RUNNING;
+            this.state = RUNNING;
 
             log.info("Started Kafka Stream process");
-        } else if (state == RUNNING) {
+        } else if (this.state == RUNNING) {
             throw new IllegalStateException("This process was already started.");
         } else {
             throw new IllegalStateException("Cannot restart after closing.");
@@ -191,28 +197,29 @@ public class KafkaStreams {
     /**
      * Shutdown this stream instance by signaling all the threads to stop,
      * and then wait for them to join.
+     *
      * @throws IllegalStateException if process has not started yet
      */
     public synchronized void close() {
         log.debug("Stopping Kafka Stream process");
 
-        if (state == RUNNING) {
+        if (this.state == RUNNING) {
             // signal the threads to stop and wait
-            for (StreamThread thread : threads)
+            for (final StreamThread thread : this.threads)
                 thread.close();
 
-            for (StreamThread thread : threads) {
+            for (final StreamThread thread : this.threads) {
                 try {
                     thread.join();
-                } catch (InterruptedException ex) {
+                } catch (final InterruptedException ex) {
                     Thread.interrupted();
                 }
             }
         }
 
-        if (state != STOPPED) {
-            metrics.close();
-            state = STOPPED;
+        if (this.state != STOPPED) {
+            this.metrics.close();
+            this.state = STOPPED;
             log.info("Stopped Kafka Stream process");
         }
 
@@ -235,12 +242,35 @@ public class KafkaStreams {
     }
 
     /**
+     * Cleans up local state store directory ({@code state.dir}), by deleting all data with regard to the application-id.
+     * <p>
+     * May only be called either before instance is started or after instance is closed.
+     *
+     * @throws IllegalStateException if instance is currently running
+     */
+    public void cleanUp() {
+        if (this.state == RUNNING) {
+            throw new IllegalStateException("Cannot clean up while running.");
+        }
+
+        final String localApplicationDir = this.config.getString(StreamsConfig.STATE_DIR_CONFIG)
+            + File.separator
+            + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        log.debug("Clean up local Kafka Streams data in {}", localApplicationDir);
+        log.debug("Removing local Kafka Streams application data in {} for application {}",
+            localApplicationDir,
+            this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+        Utils.delete(new File(localApplicationDir));
+    }
+
+    /**
      * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception.
      *
      * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
      */
-    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) {
-        for (StreamThread thread : threads)
+    public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
+        for (final StreamThread thread : this.threads)
             thread.setUncaughtExceptionHandler(eh);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 22d8bf2..f33cb3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,21 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Properties;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class KafkaStreamsTest {
 
     @Test
     public void testStartAndClose() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -36,71 +41,143 @@ public class KafkaStreamsTest {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
 
         streams.start();
         final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int initCountDifference = newInitCount - oldInitCount;
-        Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
+        assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
 
         streams.close();
         Assert.assertEquals("each reporter initialized should also be closed",
-                oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
+            oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
     @Test
     public void testCloseIsIdempotent() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
         streams.close();
         Assert.assertEquals("subsequent close() calls should do nothing",
-                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
-    @Test
+    @Test(expected = IllegalStateException.class)
     public void testCannotStartOnceClosed() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.close();
 
         try {
             streams.start();
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
             Assert.assertEquals("Cannot restart after closing.", e.getMessage());
-            return;
+            throw e;
+        } finally {
+            streams.close();
         }
-        Assert.fail("should have caught an exception and returned");
     }
 
-    @Test
+    @Test(expected = IllegalStateException.class)
     public void testCannotStartTwice() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
 
         try {
             streams.start();
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
             Assert.assertEquals("This process was already started.", e.getMessage());
-            return;
+            throw e;
+        } finally {
+            streams.close();
         }
-        Assert.fail("should have caught an exception and returned");
     }
+
+    @Test
+    public void testCleanup() throws Exception {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        streams.cleanUp();
+        streams.start();
+        streams.close();
+        streams.cleanUp();
+    }
+
+    @Test
+    public void testCleanupIsolation() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final String appId1 = "testIsolation-1";
+        final String appId2 = "testIsolation-2";
+        final String stateDir = TestUtils.tempDirectory().getPath();
+        final File stateDirApp1 = new File(stateDir + File.separator + appId1);
+        final File stateDirApp2 = new File(stateDir + File.separator + appId2);
+
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        assertFalse(stateDirApp1.exists());
+        assertFalse(stateDirApp2.exists());
+
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1);
+        final KafkaStreams streams1 = new KafkaStreams(builder, props);
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2);
+        final KafkaStreams streams2 = new KafkaStreams(builder, props);
+
+        assertTrue(stateDirApp1.exists());
+        assertTrue(stateDirApp2.exists());
+
+        streams1.cleanUp();
+        assertFalse(stateDirApp1.exists());
+        assertTrue(stateDirApp2.exists());
+
+        streams2.cleanUp();
+        assertFalse(stateDirApp1.exists());
+        assertFalse(stateDirApp2.exists());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCannotCleanupWhileRunning() throws Exception {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        streams.start();
+        try {
+            streams.cleanUp();
+        } catch (final IllegalStateException e) {
+            Assert.assertEquals("Cannot clean up while running.", e.getMessage());
+            throw e;
+        } finally {
+            streams.close();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
new file mode 100644
index 0000000..bffdfd9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.StreamsResetter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+/**
+ * Tests local state store and global application cleanup.
+ */
+public class ResetIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
+    private static final String APP_ID = "cleanup-integration-test";
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+
+    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
+    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(INPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
+    }
+
+    @Test
+    public void testReprocessingFromScratchAfterCleanUp() throws Exception {
+        final Properties streamsConfiguration = prepareTest();
+        final Properties resultTopicConsumerConfig = prepareResultConsumer();
+
+        prepareInputData();
+        final KStreamBuilder builder = setupTopology();
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
+        // receive only first values to make sure intermediate user topic is not consumed completely
+        // => required to test "seekToEnd" for intermediate topics
+        final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
+
+        streams.close();
+
+        // RESET
+        Utils.sleep(STREAMS_CONSUMER_TIMEOUT);
+        streams.cleanUp();
+        cleanGlobal();
+        assertInternalTopicsGotDeleted();
+        Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
+
+        // RE-RUN
+        streams = new KafkaStreams(setupTopology(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
+        final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+        assertThat(resultRerun2, equalTo(result2));
+    }
+
+    private Properties prepareTest() throws Exception {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        return streamsConfiguration;
+    }
+
+    private Properties prepareResultConsumer() {
+        final Properties resultTopicConsumerConfig = new Properties();
+        resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-standard-consumer-" + OUTPUT_TOPIC);
+        resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+
+        return resultTopicConsumerConfig;
+    }
+
+    private void prepareInputData() throws Exception {
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 10L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 20L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 30L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 40L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 50L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 60L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 61L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 62L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 63L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L);
+    }
+
+    private KStreamBuilder setupTopology() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        final KTable<Long, Long> globalCounts = input
+            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                @Override
+                public KeyValue<Long, String> apply(final Long key, final String value) {
+                    return new KeyValue<>(key, value);
+                }
+            })
+            .groupByKey()
+            .count("global-count");
+        globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
+
+        final KStream<Long, Long> windowedCounts = input
+            .through(INTERMEDIATE_USER_TOPIC)
+            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                @Override
+                public KeyValue<Long, String> apply(final Long key, final String value) {
+                    // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped
+                    // => want to test "skip over" unprocessed records
+                    // increasing the sleep time only has disadvantage that test run time is increased
+                    Utils.sleep(1000);
+                    return new KeyValue<>(key, value);
+                }
+            })
+            .groupByKey()
+            .count(TimeWindows.of(35).advanceBy(10), "count")
+            .toStream()
+            .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() {
+                @Override
+                public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
+                    return new KeyValue<>(key.window().start() + key.window().end(), value);
+                }
+            });
+        windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2);
+
+        return builder;
+    }
+
+    private void cleanGlobal() {
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(
+            new String[]{
+                "--application-id", APP_ID,
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--zookeeper", CLUSTER.zKConnectString(),
+                "--input-topics", INPUT_TOPIC,
+                "--intermediate-topics", INTERMEDIATE_USER_TOPIC
+            },
+            cleanUpConfig);
+        Assert.assertEquals(0, exitCode);
+    }
+
+    private void assertInternalTopicsGotDeleted() {
+        final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
+        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
+        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
+
+        Set<String> allTopics;
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            do {
+                Utils.sleep(100);
+                allTopics = new HashSet<>();
+                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size());
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
index c2f58ee..92290f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -50,6 +50,7 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
         brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+        brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
 
         log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
         broker = new KafkaEmbedded(brokerConfig);
@@ -131,4 +132,4 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
     public void deleteTopic(String topic) {
         broker.deleteTopic(topic);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8deedcac/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
new file mode 100644
index 0000000..734c15b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import kafka.admin.TopicCommand;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch.
+ * <p>
+ * Resetting the processing state of an application includes the following actions:
+ * <ol>
+ * <li>setting the application's consumer offsets for input and internal topics to zero</li>
+ * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate topics)</li>
+ * <li>deleting any topics created internally by Kafka Streams for this application</li>
+ * </ol>
+ * <p>
+ * Do only use this tool if <strong>no</strong> application instance is running. Otherwise, the application will get into an invalid state and crash or produce wrong results.
+ * <p>
+ * If you run multiple application instances, running this tool once is sufficient.
+ * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance (to clean local state store directory).
+ * Otherwise, your application is in an invalid state.
+ * <p>
+ * User output topics will not be deleted or modified by this tool.
+ * If downstream applications consume intermediate or output topics, it is the user's responsibility to adjust those applications manually if required.
+ */
+public class StreamsResetter {
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_ERROR = 1;
+
+    private static OptionSpec<String> bootstrapServerOption;
+    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpec<String> applicationIdOption;
+    private static OptionSpec<String> inputTopicsOption;
+    private static OptionSpec<String> intermediateTopicsOption;
+
+    private OptionSet options = null;
+    private final Properties consumerConfig = new Properties();
+    private final List<String> allTopics = new LinkedList<>();
+
+    public int run(final String[] args) {
+        return run(args, new Properties());
+    }
+
+    public int run(final String[] args, final Properties config) {
+        this.consumerConfig.clear();
+        this.consumerConfig.putAll(config);
+
+        int exitCode = EXIT_CODE_SUCCESS;
+
+        ZkUtils zkUtils = null;
+        try {
+            parseArguments(args);
+
+            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            this.allTopics.clear();
+            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+
+            resetInputAndInternalTopicOffsets();
+            seekToEndIntermediateTopics();
+            deleteInternalTopics(zkUtils);
+        } catch (final Exception e) {
+            exitCode = EXIT_CODE_ERROR;
+            System.err.println("ERROR: " + e.getMessage());
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+
+        return exitCode;
+    }
+
+    private void parseArguments(final String[] args) throws IOException {
+        final OptionParser optionParser = new OptionParser();
+        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("id")
+            .required();
+        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:9092")
+            .describedAs("urls");
+        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:2181")
+            .describedAs("url");
+        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+
+        try {
+            this.options = optionParser.parse(args);
+        } catch (final OptionException e) {
+            optionParser.printHelpOn(System.err);
+            throw e;
+        }
+    }
+
+    private void resetInputAndInternalTopicOffsets() {
+        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+
+        if (inputTopics.size() == 0) {
+            System.out.println("No input topics specified.");
+        } else {
+            System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+        }
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String inTopic : inputTopics) {
+            if (!this.allTopics.contains(inTopic)) {
+                System.out.println("Input topic " + inTopic + " not found. Skipping.");
+            }
+        }
+
+        for (final String topic : this.allTopics) {
+            if (isInputTopic(topic) || isInternalTopic(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToBeginning(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInputTopic(final String topic) {
+        return this.options.valuesOf(inputTopicsOption).contains(topic);
+    }
+
+    private void seekToEndIntermediateTopics() {
+        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
+
+        if (intermediateTopics.size() == 0) {
+            System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
+            return;
+        }
+
+        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String topic : intermediateTopics) {
+            if (this.allTopics.contains(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToEnd(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
+                    throw e;
+                }
+            } else {
+                System.out.println("Topic " + topic + " not found. Skipping.");
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private void deleteInternalTopics(final ZkUtils zkUtils) {
+        System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+
+        for (final String topic : this.allTopics) {
+            if (isInternalTopic(topic)) {
+                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
+                    "--zookeeper", this.options.valueOf(zookeeperOption),
+                    "--delete", "--topic", topic});
+                try {
+                    TopicCommand.deleteTopic(zkUtils, commandOptions);
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInternalTopic(final String topicName) {
+        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+            && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+    }
+
+    public static void main(final String[] args) {
+        System.exit(new StreamsResetter().run(args));
+    }
+
+}


Mime
View raw message