kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3185: [Streams] Added Kafka Streams Application Reset Tool
Date Wed, 27 Jul 2016 22:55:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 279f89a67 -> b297cead3


KAFKA-3185: [Streams] Added Kafka Streams Application Reset Tool

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang, Michael G. Noll

Closes #1671 from mjsax/resetTool-0.10.0.1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b297cead
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b297cead
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b297cead

Branch: refs/heads/0.10.0
Commit: b297cead3ef7ee3516590e696975071cadb06fba
Parents: 279f89a
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Jul 27 15:55:06 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 27 15:55:06 2016 -0700

----------------------------------------------------------------------
 bin/kafka-streams-application-reset.sh          |  21 ++
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |   5 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  96 ++++---
 .../apache/kafka/streams/KafkaStreamsTest.java  | 129 +++++++--
 .../integration/ResetIntegrationTest.java       | 253 ++++++++++++++++++
 .../utils/EmbeddedSingleNodeKafkaCluster.java   |   9 +-
 .../integration/utils/IntegrationTestUtils.java | 133 +++++-----
 .../integration/utils/KafkaEmbedded.java        | 105 ++++----
 .../org/apache/kafka/tools/StreamsResetter.java | 260 +++++++++++++++++++
 10 files changed, 849 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/bin/kafka-streams-application-reset.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh
new file mode 100755
index 0000000..26ab766
--- /dev/null
+++ b/bin/kafka-streams-application-reset.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 36647b3..e2f4da6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -634,6 +634,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+    compile project(':core')
     compile project(':clients')
     compile project(':log4j-appender')
     compile libs.argparse4j
@@ -682,6 +683,7 @@ project(':streams') {
     testCompile project(':clients').sourceSets.test.output
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
+    testCompile project(':tools')
     testCompile libs.junit
 
     testRuntime libs.slf4jlog4j

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f52cce..1052d8e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,6 +123,8 @@
     <allow pkg="com.fasterxml.jackson" />
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
+    <allow pkg="joptsimple" />
+    <allow pkg="kafka" />
   </subpackage>
 
   <subpackage name="streams">
@@ -148,7 +150,8 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
-      <allow pkg="org.hamcrest.CoreMatchers" />
+      <allow pkg="org.hamcrest" />
+      <allow pkg="org.apache.kafka.tools" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index af6d973..17c760e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -24,12 +24,14 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -43,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The computational logic can be specified either by using the {@link TopologyBuilder} class to define the a DAG topology of
  * {@link org.apache.kafka.streams.processor.Processor}s or by using the {@link org.apache.kafka.streams.kstream.KStreamBuilder}
  * class which provides the high-level {@link org.apache.kafka.streams.kstream.KStream} DSL to define the transformation.
- *
+ * <p>
  * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or
  * more threads specified in the configs for the processing work.
  * <p>
@@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
  * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
  * <p>
- *
+ * <p>
  * A simple example might look like this:
  * <pre>
  *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
@@ -74,7 +76,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  *    KafkaStreams streams = new KafkaStreams(builder, config);
  *    streams.start();
  * </pre>
- *
  */
 
 @InterfaceStability.Unstable
@@ -99,52 +100,56 @@ public class KafkaStreams {
     // usage only and should not be exposed to users at all.
     private final UUID processId;
 
+    private final StreamsConfig config;
+
     /**
      * Construct the stream instance.
      *
-     * @param builder  the processor topology builder specifying the computational logic
-     * @param props    properties for the {@link StreamsConfig}
+     * @param builder the processor topology builder specifying the computational logic
+     * @param props   properties for the {@link StreamsConfig}
      */
-    public KafkaStreams(TopologyBuilder builder, Properties props) {
+    public KafkaStreams(final TopologyBuilder builder, final Properties props) {
         this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
      * Construct the stream instance.
      *
-     * @param builder  the processor topology builder specifying the computational logic
-     * @param config   the stream configs
+     * @param builder the processor topology builder specifying the computational logic
+     * @param config  the stream configs
      */
-    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) {
         this(builder, config, new DefaultKafkaClientSupplier());
     }
 
     /**
      * Construct the stream instance.
      *
-     * @param builder         the processor topology builder specifying the computational logic
-     * @param config          the stream configs
-     * @param clientSupplier  the kafka clients supplier which provides underlying producer and consumer clients
-     * for this {@link KafkaStreams} instance
+     * @param builder        the processor topology builder specifying the computational logic
+     * @param config         the stream configs
+     * @param clientSupplier the kafka clients supplier which provides underlying producer and consumer clients
+     *                       for this {@link KafkaStreams} instance
      */
-    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) {
+    public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) {
         // create the metrics
-        Time time = new SystemTime();
+        final Time time = new SystemTime();
 
         this.processId = UUID.randomUUID();
 
+        this.config = config;
+
         // The application ID is a required config and hence should always have value
-        String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
 
-        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
 
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                 TimeUnit.MILLISECONDS);
 
@@ -152,25 +157,26 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time);
         }
     }
 
     /**
      * Start the stream instance by starting all its threads.
+     *
      * @throws IllegalStateException if process was already started
      */
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
 
-        if (state == CREATED) {
-            for (StreamThread thread : threads)
+        if (this.state == CREATED) {
+            for (final StreamThread thread : this.threads)
                 thread.start();
 
-            state = RUNNING;
+            this.state = RUNNING;
 
             log.info("Started Kafka Stream process");
-        } else if (state == RUNNING) {
+        } else if (this.state == RUNNING) {
             throw new IllegalStateException("This process was already started.");
         } else {
             throw new IllegalStateException("Cannot restart after closing.");
@@ -180,40 +186,64 @@ public class KafkaStreams {
     /**
      * Shutdown this stream instance by signaling all the threads to stop,
      * and then wait for them to join.
+     *
      * @throws IllegalStateException if process has not started yet
      */
     public synchronized void close() {
         log.debug("Stopping Kafka Stream process");
 
-        if (state == RUNNING) {
+        if (this.state == RUNNING) {
             // signal the threads to stop and wait
-            for (StreamThread thread : threads)
+            for (final StreamThread thread : this.threads)
                 thread.close();
 
-            for (StreamThread thread : threads) {
+            for (final StreamThread thread : this.threads) {
                 try {
                     thread.join();
-                } catch (InterruptedException ex) {
+                } catch (final InterruptedException ex) {
                     Thread.interrupted();
                 }
             }
         }
 
-        if (state != STOPPED) {
-            metrics.close();
-            state = STOPPED;
+        if (this.state != STOPPED) {
+            this.metrics.close();
+            this.state = STOPPED;
             log.info("Stopped Kafka Stream process");
         }
 
     }
 
     /**
+     * Cleans up local state store directory ({@code state.dir}), by deleting all data with regard to the application-id.
+     * <p>
+     * May only be called either before instance is started or after instance is closed.
+     *
+     * @throws IllegalStateException if instance is currently running
+     */
+    public void cleanUp() {
+        if (this.state == RUNNING) {
+            throw new IllegalStateException("Cannot clean up while running.");
+        }
+
+        final String localApplicationDir = this.config.getString(StreamsConfig.STATE_DIR_CONFIG)
+            + File.separator
+            + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+
+        log.debug("Clean up local Kafka Streams data in {}", localApplicationDir);
+        log.debug("Removing local Kafka Streams application data in {} for application {}",
+            localApplicationDir,
+            this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
+        Utils.delete(new File(localApplicationDir));
+    }
+
+    /**
      * Sets the handler invoked when a stream thread abruptly terminates due to an uncaught exception.
      *
      * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
      */
-    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) {
-        for (StreamThread thread : threads)
+    public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
+        for (final StreamThread thread : this.threads)
             thread.setUncaughtExceptionHandler(eh);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 22d8bf2..af7e681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,16 +19,21 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Properties;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class KafkaStreamsTest {
 
     @Test
     public void testStartAndClose() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -36,71 +41,143 @@ public class KafkaStreamsTest {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
 
         streams.start();
         final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int initCountDifference = newInitCount - oldInitCount;
-        Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
+        assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
 
         streams.close();
         Assert.assertEquals("each reporter initialized should also be closed",
-                oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
+            oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
     @Test
     public void testCloseIsIdempotent() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
         streams.close();
         Assert.assertEquals("subsequent close() calls should do nothing",
-                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
-    @Test
+    @Test(expected = IllegalStateException.class)
     public void testCannotStartOnceClosed() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.close();
 
         try {
             streams.start();
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
             Assert.assertEquals("Cannot restart after closing.", e.getMessage());
-            return;
+            throw e;
+        } finally {
+            streams.close();
         }
-        Assert.fail("should have caught an exception and returned");
     }
 
-    @Test
+    @Test(expected = IllegalStateException.class)
     public void testCannotStartTwice() throws Exception {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
 
-        KStreamBuilder builder = new KStreamBuilder();
-        KafkaStreams streams = new KafkaStreams(builder, props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
         streams.start();
 
         try {
             streams.start();
-        } catch (IllegalStateException e) {
+        } catch (final IllegalStateException e) {
             Assert.assertEquals("This process was already started.", e.getMessage());
-            return;
+            throw e;
+        } finally {
+            streams.close();
         }
-        Assert.fail("should have caught an exception and returned");
     }
+
+    @Test
+    public void testCleanup() throws Exception {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        streams.cleanUp();
+        streams.start();
+        streams.close();
+        streams.cleanUp();
+    }
+
+    @Test
+    public void testCleanupIsolation() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final String appId1 = "testIsolation-1";
+        final String appId2 = "testIsolation-2";
+        final String stateDir = TestUtils.tempDirectory("kafka-test").getPath();
+        final File stateDirApp1 = new File(stateDir + File.separator + appId1);
+        final File stateDirApp2 = new File(stateDir + File.separator + appId2);
+
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        assertFalse(stateDirApp1.exists());
+        assertFalse(stateDirApp2.exists());
+
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1);
+        final KafkaStreams streams1 = new KafkaStreams(builder, props);
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2);
+        final KafkaStreams streams2 = new KafkaStreams(builder, props);
+
+        assertTrue(stateDirApp1.exists());
+        assertTrue(stateDirApp2.exists());
+
+        streams1.cleanUp();
+        assertFalse(stateDirApp1.exists());
+        assertTrue(stateDirApp2.exists());
+
+        streams2.cleanUp();
+        assertFalse(stateDirApp1.exists());
+        assertFalse(stateDirApp2.exists());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCannotCleanupWhileRunning() throws Exception {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+
+        streams.start();
+        try {
+            streams.cleanUp();
+        } catch (final IllegalStateException e) {
+            Assert.assertEquals("Cannot clean up while running.", e.getMessage());
+            throw e;
+        } finally {
+            streams.close();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
new file mode 100644
index 0000000..28be868
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.StreamsResetter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+/**
+ * Tests local state store and global application cleanup.
+ */
+public class ResetIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+
+    private static final String APP_ID = "cleanup-integration-test";
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+
+    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
+    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(INPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2);
+        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
+    }
+
+    @Test
+    public void testReprocessingFromScratchAfterCleanUp() throws Exception {
+        final Properties streamsConfiguration = prepareTest();
+        final Properties resultTopicConsumerConfig = prepareResultConsumer();
+
+        prepareInputData();
+        final KStreamBuilder builder = setupTopology();
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
+        // receive only first values to make sure intermediate user topic is not consumed completely
+        // => required to test "seekToEnd" for intermediate topics
+        final KeyValue<Object, Object> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
+
+        streams.close();
+
+        // RESET
+        Utils.sleep(STREAMS_CONSUMER_TIMEOUT);
+        streams.cleanUp();
+        cleanGlobal();
+        assertInternalTopicsGotDeleted();
+        Utils.sleep(CLEANUP_CONSUMER_TIMEOUT);
+
+        // RE-RUN
+        streams = new KafkaStreams(setupTopology(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC, 10);
+        final KeyValue<Object, Object> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultTopicConsumerConfig, OUTPUT_TOPIC_2, 1).get(0);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+        assertThat(resultRerun2, equalTo(result2));
+    }
+
+    private Properties prepareTest() throws Exception {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getPath());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        return streamsConfiguration;
+    }
+
+    private Properties prepareResultConsumer() {
+        final Properties resultTopicConsumerConfig = new Properties();
+        resultTopicConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        resultTopicConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-standard-consumer-" + OUTPUT_TOPIC);
+        resultTopicConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultTopicConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultTopicConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+
+        return resultTopicConsumerConfig;
+    }
+
+    private void prepareInputData() throws Exception {
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 10L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 20L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 30L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 40L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 50L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 60L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 61L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 62L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 63L);
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 64L);
+    }
+
+    private KStreamBuilder setupTopology() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        final KTable<Long, Long> globalCounts = input
+            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                @Override
+                public KeyValue<Long, String> apply(final Long key, final String value) {
+                    return new KeyValue<>(key, value);
+                }
+            })
+            .countByKey("global-count");
+        globalCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
+
+        final KStream<Long, Long> windowedCounts = input
+            .through(INTERMEDIATE_USER_TOPIC)
+            .map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+                @Override
+                public KeyValue<Long, String> apply(final Long key, final String value) {
+                    // must sleep long enough to avoid processing the whole intermediate topic before application gets stopped
+                    // => want to test "skip over" unprocessed records
+                    // increasing the sleep time only has disadvantage that test run time is increased
+                    Utils.sleep(1000);
+                    return new KeyValue<>(key, value);
+                }
+            })
+            .countByKey(TimeWindows.of("count", 35).advanceBy(10))
+            .toStream()
+            .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() {
+                @Override
+                public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
+                    return new KeyValue<>(key.window().start() + key.window().end(), value);
+                }
+            });
+        windowedCounts.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC_2);
+
+        return builder;
+    }
+
+    private void cleanGlobal() {
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(
+            new String[]{
+                "--application-id", APP_ID,
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--zookeeper", CLUSTER.zKConnectString(),
+                "--input-topics", INPUT_TOPIC,
+                "--intermediate-topics", INTERMEDIATE_USER_TOPIC
+            },
+            cleanUpConfig);
+        Assert.assertEquals(0, exitCode);
+    }
+
+    private void assertInternalTopicsGotDeleted() {
+        final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
+        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
+        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
+
+        Set<String> allTopics;
+        ZkUtils zkUtils = null;
+        try {
+            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            do {
+                Utils.sleep(100);
+                allTopics = new HashSet<>();
+                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size());
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
index 34753ae..b293a02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
@@ -48,6 +48,8 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+        brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
+        brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
 
         log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
         broker = new KafkaEmbedded(brokerConfig);
@@ -125,4 +127,9 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
                             Properties topicConfig) {
         broker.createTopic(topic, partitions, replication, topicConfig);
     }
-}
\ No newline at end of file
+
+    public void deleteTopic(String topic) {
+        broker.deleteTopic(topic);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index c3f9089..14b24aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -55,10 +55,10 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer.
      * @return The values retrieved via the consumer.
      */
-    public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
-        List<V> returnList = new ArrayList<>();
-        List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
-        for (KeyValue<?, V> kv : kvs) {
+    public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final int maxMessages) {
+        final List<V> returnList = new ArrayList<>();
+        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+        for (final KeyValue<?, V> kv : kvs) {
             returnList.add(kv.value);
         }
         return returnList;
@@ -72,7 +72,7 @@ public class IntegrationTestUtils {
      * @param consumerConfig Kafka consumer configuration
      * @return The KeyValue elements retrieved via the consumer.
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) {
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig) {
         return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
     }
 
@@ -85,17 +85,17 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer
      * @return The KeyValue elements retrieved via the consumer
      */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
-        KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) {
+        final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
         consumer.subscribe(Collections.singletonList(topic));
-        int pollIntervalMs = 100;
-        int maxTotalPollTimeMs = 2000;
+        final int pollIntervalMs = 100;
+        final int maxTotalPollTimeMs = 2000;
         int totalPollTimeMs = 0;
-        List<KeyValue<K, V>> consumedValues = new ArrayList<>();
+        final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
         while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
-            for (ConsumerRecord<K, V> record : records) {
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), record.value()));
             }
         }
@@ -103,7 +103,7 @@ public class IntegrationTestUtils {
         return consumedValues;
     }
 
-    private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+    private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
         return maxMessages <= 0 || messagesConsumed < maxMessages;
     }
 
@@ -112,10 +112,10 @@ public class IntegrationTestUtils {
      *
      * @param streamsConfiguration Streams configuration settings
      */
-    public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException {
-        String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+    public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException {
+        final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
         if (path != null) {
-            File node = Paths.get(path).normalize().toFile();
+            final File node = Paths.get(path).normalize().toFile();
             // Only purge state when it's under /tmp.  This is a safety net to prevent accidentally
             // deleting important local directory trees.
             if (node.getAbsolutePath().startsWith("/tmp")) {
@@ -132,11 +132,11 @@ public class IntegrationTestUtils {
      * @param <V>            Value type of the data records
      */
     public static <K, V> void produceKeyValuesSynchronously(
-        String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
+        final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig)
         throws ExecutionException, InterruptedException {
-        Producer<K, V> producer = new KafkaProducer<>(producerConfig);
-        for (KeyValue<K, V> record : records) {
-            Future<RecordMetadata> f = producer.send(
+        final Producer<K, V> producer = new KafkaProducer<>(producerConfig);
+        for (final KeyValue<K, V> record : records) {
+            final Future<RecordMetadata> f = producer.send(
                 new ProducerRecord<>(topic, record.key, record.value));
             f.get();
         }
@@ -144,86 +144,103 @@ public class IntegrationTestUtils {
         producer.close();
     }
 
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                         final Collection<KeyValue<K, V>> records,
+                                                                         final Properties producerConfig,
+                                                                         final Long timestamp)
+        throws ExecutionException, InterruptedException {
+        final Producer<K, V> producer = new KafkaProducer<>(producerConfig);
+        for (final KeyValue<K, V> record : records) {
+            final Future<RecordMetadata> f = producer.send(
+                new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+            f.get();
+        }
+        producer.flush();
+        producer.close();
+    }
+
     public static <V> void produceValuesSynchronously(
-        String topic, Collection<V> records, Properties producerConfig)
+        final String topic, final Collection<V> records, final Properties producerConfig)
         throws ExecutionException, InterruptedException {
-        Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
-        for (V value : records) {
-            KeyValue<Object, V> kv = new KeyValue<>(null, value);
+        final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
+        for (final V value : records) {
+            final KeyValue<Object, V> kv = new KeyValue<>(null, value);
             keyedRecords.add(kv);
         }
         produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
     }
 
-    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
-                                                                                  String topic,
-                                                                                  int expectedNumRecords) throws InterruptedException {
+    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                                  final String topic,
+                                                                                  final int expectedNumRecords) throws InterruptedException {
 
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
 
     /**
      * Wait until enough data (key-value records) has been consumed.
-     * @param consumerConfig Kafka Consumer configuration
-     * @param topic          Topic to consume from
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
      * @param expectedNumRecords Minimum number of expected records
-     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
      * @throws InterruptedException
-     * @throws AssertionError if the given wait time elapses
+     * @throws AssertionError       if the given wait time elapses
      */
-    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
-                                                                                  String topic,
-                                                                                  int expectedNumRecords,
-                                                                                  long waitTime) throws InterruptedException {
-        List<KeyValue<K, V>> accumData = new ArrayList<>();
-        long startTime = System.currentTimeMillis();
+    public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                                  final String topic,
+                                                                                  final int expectedNumRecords,
+                                                                                  final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        final long startTime = System.currentTimeMillis();
         while (true) {
-            List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
+            final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
             accumData.addAll(readData);
             if (accumData.size() >= expectedNumRecords)
                 return accumData;
             if (System.currentTimeMillis() > startTime + waitTime)
-                throw new AssertionError("Expected " +  expectedNumRecords +
+                throw new AssertionError("Expected " + expectedNumRecords +
                     " but received only " + accumData.size() +
                     " records before timeout " + waitTime + " ms");
             Thread.sleep(Math.min(waitTime, 100L));
         }
     }
 
-    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
-                                                                String topic,
-                                                                int expectedNumRecords) throws InterruptedException {
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
+                                                                final String topic,
+                                                                final int expectedNumRecords) throws InterruptedException {
 
         return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
 
     /**
      * Wait until enough data (value records) has been consumed.
-     * @param consumerConfig Kafka Consumer configuration
-     * @param topic          Topic to consume from
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
      * @param expectedNumRecords Minimum number of expected records
-     * @param waitTime       Upper bound in waiting time in milliseconds
+     * @param waitTime           Upper bound in waiting time in milliseconds
      * @return All the records consumed, or null if no records are consumed
      * @throws InterruptedException
-     * @throws AssertionError if the given wait time elapses
+     * @throws AssertionError       if the given wait time elapses
      */
-    public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
-                                                                String topic,
-                                                                int expectedNumRecords,
-                                                                long waitTime) throws InterruptedException {
-        List<V> accumData = new ArrayList<>();
-        long startTime = System.currentTimeMillis();
+    public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
+                                                                final String topic,
+                                                                final int expectedNumRecords,
+                                                                final long waitTime) throws InterruptedException {
+        final List<V> accumData = new ArrayList<>();
+        final long startTime = System.currentTimeMillis();
         while (true) {
-            List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
+            final List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
             accumData.addAll(readData);
             if (accumData.size() >= expectedNumRecords)
                 return accumData;
             if (System.currentTimeMillis() > startTime + waitTime)
-                throw new AssertionError("Expected " +  expectedNumRecords +
+                throw new AssertionError("Expected " + expectedNumRecords +
                     " but received only " + accumData.size() +
                     " records before timeout " + waitTime + " ms");
             Thread.sleep(Math.min(waitTime, 100L));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 348b46b..8e0d11c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -18,20 +18,6 @@
 package org.apache.kafka.streams.integration.utils;
 
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
@@ -42,11 +28,23 @@ import kafka.utils.SystemTime$;
 import kafka.utils.TestUtils;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 /**
  * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by
  * default.
- *
+ * <p>
  * Requires a running ZooKeeper instance to connect to.
  */
 public class KafkaEmbedded {
@@ -63,20 +61,21 @@ public class KafkaEmbedded {
 
     /**
      * Creates and starts an embedded Kafka broker.
+     *
      * @param config Broker configuration settings.  Used to modify, for example, on which port the
      *               broker should listen to.  Note that you cannot change the `log.dirs` setting
      *               currently.
      */
-    public KafkaEmbedded(Properties config) throws IOException {
-        tmpFolder = new TemporaryFolder();
-        tmpFolder.create();
-        logDir = tmpFolder.newFolder();
-        effectiveConfig = effectiveConfigFrom(config);
-        boolean loggingEnabled = true;
-        KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled);
+    public KafkaEmbedded(final Properties config) throws IOException {
+        this.tmpFolder = new TemporaryFolder();
+        this.tmpFolder.create();
+        this.logDir = this.tmpFolder.newFolder();
+        this.effectiveConfig = effectiveConfigFrom(config);
+        final boolean loggingEnabled = true;
+        final KafkaConfig kafkaConfig = new KafkaConfig(this.effectiveConfig, loggingEnabled);
         log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
-            logDir, zookeeperConnect());
-        kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
+            this.logDir, zookeeperConnect());
+        this.kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$);
         log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
             brokerList(), zookeeperConnect());
     }
@@ -85,12 +84,13 @@ public class KafkaEmbedded {
     /**
      * Creates the configuration for starting the Kafka broker by merging default values with
      * overwrites.
+     *
      * @param initialConfig Broker configuration settings that override the default config.
      * @return
      * @throws IOException
      */
-    private Properties effectiveConfigFrom(Properties initialConfig) throws IOException {
-        Properties effectiveConfig = new Properties();
+    private Properties effectiveConfigFrom(final Properties initialConfig) throws IOException {
+        final Properties effectiveConfig = new Properties();
         effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
         effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1");
         effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
@@ -100,17 +100,17 @@ public class KafkaEmbedded {
         effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
 
         effectiveConfig.putAll(initialConfig);
-        effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
+        effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath());
         return effectiveConfig;
     }
 
     /**
      * This broker's `metadata.broker.list` value.  Example: `127.0.0.1:9092`.
-     *
+     * <p>
      * You can use this to tell Kafka producers and consumers how to connect to this instance.
      */
     public String brokerList() {
-        return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT);
+        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(SecurityProtocol.PLAINTEXT);
     }
 
 
@@ -118,7 +118,7 @@ public class KafkaEmbedded {
      * The ZooKeeper connection string aka `zookeeper.connect`.
      */
     public String zookeeperConnect() {
-        return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
+        return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
     }
 
     /**
@@ -127,12 +127,12 @@ public class KafkaEmbedded {
     public void stop() {
         log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
             brokerList(), zookeeperConnect());
-        kafka.shutdown();
-        kafka.awaitShutdown();
-        log.debug("Removing logs.dir at {} ...", logDir);
-        List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
+        this.kafka.shutdown();
+        this.kafka.awaitShutdown();
+        log.debug("Removing logs.dir at {} ...", this.logDir);
+        final List<String> logDirs = Collections.singletonList(this.logDir.getAbsolutePath());
         CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
-        tmpFolder.delete();
+        this.tmpFolder.delete();
         log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
             brokerList(), zookeeperConnect());
     }
@@ -142,7 +142,7 @@ public class KafkaEmbedded {
      *
      * @param topic The name of the topic.
      */
-    public void createTopic(String topic) {
+    public void createTopic(final String topic) {
         createTopic(topic, 1, 1, new Properties());
     }
 
@@ -153,7 +153,7 @@ public class KafkaEmbedded {
      * @param partitions  The number of partitions for this topic.
      * @param replication The replication factor for (the partitions of) this topic.
      */
-    public void createTopic(String topic, int partitions, int replication) {
+    public void createTopic(final String topic, final int partitions, final int replication) {
         createTopic(topic, partitions, replication, new Properties());
     }
 
@@ -165,10 +165,10 @@ public class KafkaEmbedded {
      * @param replication The replication factor for (partitions of) this topic.
      * @param topicConfig Additional topic-level configuration settings.
      */
-    public void createTopic(String topic,
-                            int partitions,
-                            int replication,
-                            Properties topicConfig) {
+    public void createTopic(final String topic,
+                            final int partitions,
+                            final int replication,
+                            final Properties topicConfig) {
         log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
             topic, partitions, replication, topicConfig);
 
@@ -176,14 +176,29 @@ public class KafkaEmbedded {
         // createTopic() will only seem to work (it will return without error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
         // topic.
-        ZkClient zkClient = new ZkClient(
+        final ZkClient zkClient = new ZkClient(
             zookeeperConnect(),
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
-        boolean isSecure = false;
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        final boolean isSecure = false;
+        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
         AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
         zkClient.close();
     }
-}
\ No newline at end of file
+
+    public void deleteTopic(final String topic) {
+        log.debug("Deleting topic { name: {} }", topic);
+
+        final ZkClient zkClient = new ZkClient(
+            zookeeperConnect(),
+            DEFAULT_ZK_SESSION_TIMEOUT_MS,
+            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+            ZKStringSerializer$.MODULE$);
+        final boolean isSecure = false;
+        final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure);
+        AdminUtils.deleteTopic(zkUtils, topic);
+        zkClient.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b297cead/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
new file mode 100644
index 0000000..734c15b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import kafka.admin.TopicCommand;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch.
+ * <p>
+ * Resetting the processing state of an application includes the following actions:
+ * <ol>
+ * <li>setting the application's consumer offsets for input and internal topics to zero</li>
+ * <li>skip over all intermediate user topics (i.e., "seekToEnd" for consumers of intermediate topics)</li>
+ * <li>deleting any topics created internally by Kafka Streams for this application</li>
+ * </ol>
+ * <p>
+ * Do only use this tool if <strong>no</strong> application instance is running. Otherwise, the application will get into an invalid state and crash or produce wrong results.
+ * <p>
+ * If you run multiple application instances, running this tool once is sufficient.
+ * However, you need to call {@code KafkaStreams#cleanUp()} before re-starting any instance (to clean local state store directory).
+ * Otherwise, your application is in an invalid state.
+ * <p>
+ * User output topics will not be deleted or modified by this tool.
+ * If downstream applications consume intermediate or output topics, it is the user's responsibility to adjust those applications manually if required.
+ */
+public class StreamsResetter {
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_ERROR = 1;
+
+    private static OptionSpec<String> bootstrapServerOption;
+    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpec<String> applicationIdOption;
+    private static OptionSpec<String> inputTopicsOption;
+    private static OptionSpec<String> intermediateTopicsOption;
+
+    private OptionSet options = null;
+    private final Properties consumerConfig = new Properties();
+    private final List<String> allTopics = new LinkedList<>();
+
+    public int run(final String[] args) {
+        return run(args, new Properties());
+    }
+
+    public int run(final String[] args, final Properties config) {
+        this.consumerConfig.clear();
+        this.consumerConfig.putAll(config);
+
+        int exitCode = EXIT_CODE_SUCCESS;
+
+        ZkUtils zkUtils = null;
+        try {
+            parseArguments(args);
+
+            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+                30000,
+                30000,
+                JaasUtils.isZkSecurityEnabled());
+
+            this.allTopics.clear();
+            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+
+            resetInputAndInternalTopicOffsets();
+            seekToEndIntermediateTopics();
+            deleteInternalTopics(zkUtils);
+        } catch (final Exception e) {
+            exitCode = EXIT_CODE_ERROR;
+            System.err.println("ERROR: " + e.getMessage());
+        } finally {
+            if (zkUtils != null) {
+                zkUtils.close();
+            }
+        }
+
+        return exitCode;
+    }
+
+    private void parseArguments(final String[] args) throws IOException {
+        final OptionParser optionParser = new OptionParser();
+        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("id")
+            .required();
+        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:9092")
+            .describedAs("urls");
+        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST")
+            .withRequiredArg()
+            .ofType(String.class)
+            .defaultsTo("localhost:2181")
+            .describedAs("url");
+        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics")
+            .withRequiredArg()
+            .ofType(String.class)
+            .withValuesSeparatedBy(',')
+            .describedAs("list");
+
+        try {
+            this.options = optionParser.parse(args);
+        } catch (final OptionException e) {
+            optionParser.printHelpOn(System.err);
+            throw e;
+        }
+    }
+
+    private void resetInputAndInternalTopicOffsets() {
+        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+
+        if (inputTopics.size() == 0) {
+            System.out.println("No input topics specified.");
+        } else {
+            System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+        }
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String inTopic : inputTopics) {
+            if (!this.allTopics.contains(inTopic)) {
+                System.out.println("Input topic " + inTopic + " not found. Skipping.");
+            }
+        }
+
+        for (final String topic : this.allTopics) {
+            if (isInputTopic(topic) || isInternalTopic(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToBeginning(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInputTopic(final String topic) {
+        return this.options.valuesOf(inputTopicsOption).contains(topic);
+    }
+
+    private void seekToEndIntermediateTopics() {
+        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
+
+        if (intermediateTopics.size() == 0) {
+            System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
+            return;
+        }
+
+        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
+
+        final Properties config = new Properties();
+        config.putAll(this.consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        for (final String topic : intermediateTopics) {
+            if (this.allTopics.contains(topic)) {
+                System.out.println("Topic: " + topic);
+
+                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+                    client.subscribe(Collections.singleton(topic));
+                    client.poll(1);
+
+                    final Set<TopicPartition> partitions = client.assignment();
+                    client.seekToEnd(partitions);
+                    for (final TopicPartition p : partitions) {
+                        client.position(p);
+                    }
+                    client.commitSync();
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
+                    throw e;
+                }
+            } else {
+                System.out.println("Topic " + topic + " not found. Skipping.");
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private void deleteInternalTopics(final ZkUtils zkUtils) {
+        System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+
+        for (final String topic : this.allTopics) {
+            if (isInternalTopic(topic)) {
+                final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
+                    "--zookeeper", this.options.valueOf(zookeeperOption),
+                    "--delete", "--topic", topic});
+                try {
+                    TopicCommand.deleteTopic(zkUtils, commandOptions);
+                } catch (final RuntimeException e) {
+                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
+                    throw e;
+                }
+            }
+        }
+
+        System.out.println("Done.");
+    }
+
+    private boolean isInternalTopic(final String topicName) {
+        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+            && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
+    }
+
+    public static void main(final String[] args) {
+        System.exit(new StreamsResetter().run(args));
+    }
+
+}


Mime
View raw message