kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5520: KIP-171; Extend Consumer Group Reset Offset for Stream Application
Date Wed, 06 Dec 2017 19:38:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fd8f182cc -> 30f08d158


KAFKA-5520: KIP-171; Extend Consumer Group Reset Offset for Stream Application

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application

Merge changes from KIP-198

Ref: https://github.com/apache/kafka/pull/3831

Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Matthias J. Sax <matthias@confluent.io>
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Author: Guozhang Wang <wangguoz@gmail.com>
Author: Apurva Mehta <apurva@confluent.io>
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Author: Jason Gustafson <jason@confluent.io>
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Author: Bill Bejeck <bill@confluent.io>
Author: Dong Lin <lindong28@gmail.com>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Author: Colin P. Mccabe <cmccabe@confluent.io>
Author: Damian Guy <damian.guy@gmail.com>
Author: Xavier Léauté <xl+github@xvrl.net>
Author: Maytee Chinavanichkit <maytee.chinavanichkit@linecorp.com>
Author: Joel Hamill <git config --global user.email>
Author: Paolo Patierno <ppatierno@live.com>
Author: siva santhalingam <siva.santhalingam@gmail.com>
Author: Tommy Becker <tobecker@tivo.com>
Author: Mickael Maison <mickael.maison@gmail.com>
Author: Onur Karaman <okaraman@linkedin.com>
Author: tedyu <yuzhihong@gmail.com>
Author: Xin Li <Xin.Li@trivago.com>
Author: Magnus Edenhill <magnus@edenhill.se>
Author: Manjula K <manjula@kafka-summit.org>
Author: Hugo Louro <hmclouro@gmail.com>
Author: Jeff Widman <jeff@jeffwidman.com>
Author: bartdevylder <bartdevylder@gmail.com>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Jacek Laskowski <jacek@japila.pl>
Author: Tom Bentley <tbentley@redhat.com>
Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4159 from jeqo/feature/kip-171


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30f08d15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30f08d15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30f08d15

Branch: refs/heads/trunk
Commit: 30f08d158a97e0d83b59e5910fbb0a84c5c6d14f
Parents: fd8f182
Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com>
Authored: Wed Dec 6 11:38:38 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 6 11:38:38 2017 -0800

----------------------------------------------------------------------
 checkstyle/import-control-core.xml              |   3 +
 checkstyle/import-control.xml                   |   4 +
 .../kafka/admin/ConsumerGroupCommand.scala      |   8 +-
 .../main/scala/kafka/tools/StreamsResetter.java | 300 ++++++++++++++++---
 .../admin/ResetConsumerGroupOffsetTest.scala    |  42 +--
 .../AbstractResetIntegrationTest.java           | 263 +++++++++++++---
 .../integration/ResetIntegrationTest.java       |  21 +-
 .../ResetIntegrationWithSslTest.java            |   2 +-
 .../streams/tools/StreamsResetterTest.java      | 293 ++++++++++++++++++
 9 files changed, 835 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index bf06a19..4865986 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -60,6 +60,9 @@
     <allow pkg="kafka.consumer" />
     <allow pkg="joptsimple" />
     <allow pkg="org.apache.kafka.clients.consumer" />
+    <allow class="javax.xml.datatype.Duration" />
+    <allow class="javax.xml.datatype.DatatypeFactory" />
+    <allow class="javax.xml.datatype.DatatypeConfigurationException" />
   </subpackage>
 
   <subpackage name="coordinator">

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 403cae2..a2b508e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -233,6 +233,10 @@
       <allow pkg="kafka.admin" />
     </subpackage>
 
+    <subpackage name="tools">
+      <allow pkg="kafka.tools" />
+    </subpackage>
+
     <subpackage name="state">
       <allow pkg="org.rocksdb" />
     </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d71f062..9e35ebc 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -529,8 +529,8 @@ object ConsumerGroupCommand extends Logging {
         case "Empty" | "Dead" =>
           val partitionsToReset = getPartitionsToReset(groupId)
           val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
-          val execute = opts.options.has(opts.executeOpt)
-          if (execute)
+          val dryRun = opts.options.has(opts.dryRunOpt)
+          if (!dryRun)
             getConsumer().commitSync(preparedOffsets.asJava)
           preparedOffsets
         case currentState =>
@@ -727,7 +727,7 @@ object ConsumerGroupCommand extends Logging {
       "Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
       "Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl +
       "To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario"
-    val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
+    val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."
     val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
     val ResetToOffsetDoc = "Reset offsets to a specific offset."
     val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
@@ -770,7 +770,7 @@ object ConsumerGroupCommand extends Logging {
                                   .describedAs("command config property file")
                                   .ofType(classOf[String])
     val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
-    val executeOpt = parser.accepts("execute", ExecuteDoc)
+    val dryRunOpt = parser.accepts("dry-run", DryRunDoc)
     val exportOpt = parser.accepts("export", ExportDoc)
     val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
                            .withRequiredArg()

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 5539258..4851a94 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -21,11 +21,14 @@ import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
+import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -33,8 +36,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.Duration;
 import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -74,12 +83,19 @@ public class StreamsResetter {
     private static OptionSpec<String> applicationIdOption;
     private static OptionSpec<String> inputTopicsOption;
     private static OptionSpec<String> intermediateTopicsOption;
+    private static OptionSpec<Long> toOffsetOption;
+    private static OptionSpec<String> toDatetimeOption;
+    private static OptionSpec<String> byDurationOption;
+    private static OptionSpecBuilder toEarliestOption;
+    private static OptionSpecBuilder toLatestOption;
+    private static OptionSpec<String> fromFileOption;
+    private static OptionSpec<Long> shiftByOption;
     private static OptionSpecBuilder dryRunOption;
     private static OptionSpec<String> commandConfigOption;
 
     private OptionSet options = null;
     private final List<String> allTopics = new LinkedList<>();
-    private boolean dryRun = false;
+
 
     public int run(final String[] args) {
         return run(args, new Properties());
@@ -93,7 +109,7 @@ public class StreamsResetter {
 
         try {
             parseArguments(args);
-            dryRun = options.has(dryRunOption);
+            final boolean dryRun = options.has(dryRunOption);
 
             final String groupId = options.valueOf(applicationIdOption);
             final Properties properties = new Properties();
@@ -114,8 +130,8 @@ public class StreamsResetter {
 
             final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
             consumerConfig.putAll(properties);
-            maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig);
-            maybeDeleteInternalTopics(kafkaAdminClient);
+            maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
+            maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
 
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
@@ -169,6 +185,24 @@ public class StreamsResetter {
             .ofType(String.class)
             .withValuesSeparatedBy(',')
             .describedAs("list");
+        toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.")
+            .withRequiredArg()
+            .ofType(Long.class);
+        toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'")
+            .withRequiredArg()
+            .ofType(String.class);
+        byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'")
+            .withRequiredArg()
+            .ofType(String.class);
+        toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
+        toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
+        fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.")
+            .withRequiredArg()
+            .ofType(String.class);
+        shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative")
+            .withRequiredArg()
+            .describedAs("number-of-offsets")
+            .ofType(Long.class);
         commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.")
             .withRequiredArg()
             .ofType(String.class)
@@ -184,17 +218,33 @@ public class StreamsResetter {
             printHelp(optionParser);
             throw e;
         }
+
+        scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
+        allScenarioOptions.$plus(toOffsetOption);
+        allScenarioOptions.$plus(toDatetimeOption);
+        allScenarioOptions.$plus(byDurationOption);
+        allScenarioOptions.$plus(toEarliestOption);
+        allScenarioOptions.$plus(toLatestOption);
+        allScenarioOptions.$plus(fromFileOption);
+        allScenarioOptions.$plus(shiftByOption);
+
+        CommandLineUtils.checkInvalidArgs(optionParser, options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, byDurationOption, allScenarioOptions.$minus(byDurationOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, toEarliestOption, allScenarioOptions.$minus(toEarliestOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, toLatestOption, allScenarioOptions.$minus(toLatestOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, fromFileOption, allScenarioOptions.$minus(fromFileOption));
+        CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption));
     }
 
-    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig) {
+    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, final boolean dryRun) throws Exception {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
-
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
 
-        String groupId = options.valueOf(applicationIdOption);
+        final String groupId = options.valueOf(applicationIdOption);
 
         if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
             System.out.println("No input or intermediate topics specified. Skipping seek.");
@@ -202,7 +252,7 @@ public class StreamsResetter {
         }
 
         if (inputTopics.size() != 0) {
-            System.out.println("Seek-to-beginning for input topics " + inputTopics);
+            System.out.println("Reset-offsets for input topics " + inputTopics);
         }
         if (intermediateTopics.size() != 0) {
             System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
@@ -215,6 +265,7 @@ public class StreamsResetter {
                 notFoundInputTopics.add(topic);
             } else {
                 topicsToSubscribe.add(topic);
+
             }
         }
         for (final String topic : intermediateTopics) {
@@ -249,9 +300,9 @@ public class StreamsResetter {
                 }
             }
 
-            maybeSeekToBeginning(client, inputTopicPartitions);
+            maybeReset(groupId, client, inputTopicPartitions);
 
-            maybeSeekToEnd(client, intermediateTopicPartitions);
+            maybeSeekToEnd(groupId, client, intermediateTopicPartitions);
 
             if (!dryRun) {
                 for (final TopicPartition p : partitions) {
@@ -274,49 +325,225 @@ public class StreamsResetter {
                 }
             }
 
-        } catch (final RuntimeException e) {
+        } catch (final Exception e) {
             System.err.println("ERROR: Resetting offsets failed.");
             throw e;
         }
         System.out.println("Done.");
     }
 
-    private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client,
+    // visible for testing
+    public void maybeSeekToEnd(final String groupId,
+                                final Consumer<byte[], byte[]> client,
                                 final Set<TopicPartition> intermediateTopicPartitions) {
-
-        final String groupId = options.valueOf(applicationIdOption);
-        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
-
         if (intermediateTopicPartitions.size() > 0) {
             System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
-            for (final String topic : intermediateTopics) {
-                if (allTopics.contains(topic)) {
-                    System.out.println("Topic: " + topic);
+            for (final TopicPartition topicPartition : intermediateTopicPartitions) {
+                if (allTopics.contains(topicPartition.topic())) {
+                    System.out.println("Topic: " + topicPartition.topic());
                 }
             }
-            if (!dryRun) {
-                client.seekToEnd(intermediateTopicPartitions);
+
+            client.seekToEnd(intermediateTopicPartitions);
+        }
+    }
+
+    private void maybeReset(final String groupId,
+                            final Consumer<byte[], byte[]> client,
+                            final Set<TopicPartition> inputTopicPartitions)
+        throws Exception {
+
+        if (inputTopicPartitions.size() > 0) {
+            System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
+            if (options.has(toOffsetOption)) {
+                resetOffsetsTo(client, inputTopicPartitions, options.valueOf(toOffsetOption));
+            } else if (options.has(toEarliestOption)) {
+                client.seekToBeginning(inputTopicPartitions);
+            } else if (options.has(toLatestOption)) {
+                client.seekToEnd(inputTopicPartitions);
+            } else if (options.has(shiftByOption)) {
+                shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
+            } else if (options.has(toDatetimeOption)) {
+                final String ts = options.valueOf(toDatetimeOption);
+                final Long timestamp = getDateTime(ts);
+                resetToDatetime(client, inputTopicPartitions, timestamp);
+            } else if (options.has(byDurationOption)) {
+                final String duration = options.valueOf(byDurationOption);
+                final Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration);
+                resetByDuration(client, inputTopicPartitions, durationParsed);
+            } else if (options.has(fromFileOption)) {
+                final String resetPlanPath = options.valueOf(fromFileOption);
+                final Map<TopicPartition, Long> topicPartitionsAndOffset = getTopicPartitionOffsetFromResetPlan(resetPlanPath);
+                resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
+            } else {
+                client.seekToBeginning(inputTopicPartitions);
+            }
+
+            for (final TopicPartition p : inputTopicPartitions) {
+                final Long position = client.position(p);
+                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position);
             }
         }
     }
 
-    private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client,
-                                      final Set<TopicPartition> inputTopicPartitions) {
+    // visible for testing
+    public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
+        final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
+        final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
 
-        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
-        final String groupId = options.valueOf(applicationIdOption);
+        final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
+            checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
 
-        if (inputTopicPartitions.size() > 0) {
-            System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")");
-            for (final String topic : inputTopics) {
-                if (allTopics.contains(topic)) {
-                    System.out.println("Topic: " + topic);
-                }
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
+            client.seek(topicPartition, offset);
+        }
+    }
+
+    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
+        final String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
+        return parseResetPlan(resetPlanCsv);
+    }
+
+    private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
+        final Date now = new Date();
+        duration.negate().addTo(now);
+        final Long timestamp = now.getTime();
+
+        final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            topicPartitionsAndTimes.put(topicPartition, timestamp);
+        }
+
+        final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
+
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
+            client.seek(topicPartition, offset);
+        }
+    }
+
+    private void resetToDatetime(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long timestamp) {
+        final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            topicPartitionsAndTimes.put(topicPartition, timestamp);
+        }
+
+        final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
+
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
+            client.seek(topicPartition, offset);
+        }
+    }
+
+    // visible for testing
+    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long shiftBy) {
+        final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
+        final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
+
+        final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            final Long position = client.position(topicPartition);
+            final Long offset = position + shiftBy;
+            topicPartitionsAndOffset.put(topicPartition, offset);
+        }
+
+        final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
+            checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
+
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
+        }
+    }
+
+    // visible for testing
+    public void resetOffsetsTo(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long offset) {
+        final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
+        final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
+
+        final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            topicPartitionsAndOffset.put(topicPartition, offset);
+        }
+
+        final Map<TopicPartition, Long> validatedTopicPartitionsAndOffset =
+            checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
+
+        for (final TopicPartition topicPartition : inputTopicPartitions) {
+            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
+        }
+    }
+
+    // visible for testing
+    public Long getDateTime(String timestamp) throws ParseException {
+        final String[] timestampParts = timestamp.split("T");
+        if (timestampParts.length < 2) {
+            throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
+        }
+
+        final String secondPart = timestampParts[1];
+        if (secondPart == null || secondPart.isEmpty()) {
+            throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
+        }
+
+        if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
+            timestamp = timestamp + "Z";
+        }
+
+        try {
+            final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
+            return date.getTime();
+        } catch (ParseException e) {
+            final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
+            return date.getTime();
+        }
+    }
+
+    private Map<TopicPartition, Long> parseResetPlan(final String resetPlanCsv) throws ParseException {
+        final Map<TopicPartition, Long> topicPartitionAndOffset = new HashMap<>();
+        if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
+            throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0);
+        }
+
+        final String[] resetPlanCsvParts = resetPlanCsv.split("\n");
+
+        for (final String line : resetPlanCsvParts) {
+            final String[] lineParts = line.split(",");
+            if (lineParts.length != 3) {
+                throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0);
             }
-            if (!dryRun) {
-                client.seekToBeginning(inputTopicPartitions);
+            final String topic = lineParts[0];
+            final int partition = Integer.parseInt(lineParts[1]);
+            final long offset = Long.parseLong(lineParts[2]);
+            final TopicPartition topicPartition = new TopicPartition(topic, partition);
+            topicPartitionAndOffset.put(topicPartition, offset);
+        }
+
+        return topicPartitionAndOffset;
+    }
+
+    private Map<TopicPartition, Long> checkOffsetRange(final Map<TopicPartition, Long> inputTopicPartitionsAndOffset,
+                                                       final Map<TopicPartition, Long> beginningOffsets,
+                                                       final Map<TopicPartition, Long> endOffsets) {
+        final Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
+        for (final Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
+            final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
+            final Long offset = topicPartitionAndOffset.getValue();
+            if (offset < endOffset) {
+                final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
+                if (offset > beginningOffset) {
+                    validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
+                } else {
+                    System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset);
+                    validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset);
+                }
+            } else {
+                System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset);
+                validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset);
             }
         }
+        return validatedTopicPartitionsOffsets;
     }
 
     private boolean isInputTopic(final String topic) {
@@ -327,7 +554,7 @@ public class StreamsResetter {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) {
+    private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient, final boolean dryRun) {
 
         System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
         List<String> topicsToDelete = new ArrayList<>();
@@ -346,8 +573,9 @@ public class StreamsResetter {
         System.out.println("Done.");
     }
 
-    private void doDelete(final List<String> topicsToDelete,
-                          final KafkaAdminClient adminClient) {
+    // visible for testing
+    public void doDelete(final List<String> topicsToDelete,
+                          final AdminClient adminClient) {
         boolean hasDeleteErrors = false;
         final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
         final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 8227487..f26d3c4 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -84,7 +84,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsNewConsumerExistingTopic(): Unit = {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "new.group", "--topic", topic1, "--to-offset", "50")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
@@ -112,7 +112,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -133,7 +133,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     executor.shutdown()
 
-    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
+    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime))
     val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
     val consumerGroupCommand1 = createConsumerGroupService(opts1)
 
@@ -157,7 +157,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--dry-run")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -177,7 +177,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     executor.shutdown()
 
-    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
+    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint))
     val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
     val consumerGroupCommand1 = createConsumerGroupService(opts1)
 
@@ -218,7 +218,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   private def invokeGetDateTimeMethod(format: SimpleDateFormat) {
     val checkpoint = new Date()
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint))
     val opts  = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
     consumerGroupCommand.getDateTime
@@ -226,7 +226,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsByDuration() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -246,7 +246,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsByDurationToEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -265,7 +265,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -284,7 +284,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToLatest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -306,7 +306,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToCurrentOffset() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -347,7 +347,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToSpecificOffset() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -367,7 +367,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsShiftPlus() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -388,7 +388,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsShiftMinus() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -410,7 +410,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsShiftByLowerThanEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -431,7 +431,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsShiftByHigherThanLatest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -452,7 +452,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToEarliestOnOneTopic() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -471,7 +471,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
   @Test
   def testResetOffsetsToEarliestOnOneTopicAndPartition() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -494,7 +494,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--group", group,
       "--topic", topic1,
       "--topic", topic2,
-      "--to-earliest", "--execute")
+      "--to-earliest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -521,7 +521,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--group", group,
       "--topic", String.format("%s:1", topic1),
       "--topic", String.format("%s:1", topic2),
-      "--to-earliest", "--execute")
+      "--to-earliest")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
@@ -563,7 +563,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consume all messages and save reset offsets plan to file")
 
 
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath)
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
     val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
     val consumerGroupCommandExec = createConsumerGroupService(optsExec)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index adb887c..9131007 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -50,6 +50,10 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -176,7 +180,7 @@ abstract class AbstractResetIntegrationTest {
         // RESET
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
         streams.cleanUp();
-        cleanGlobal(null, sslConfig);
+        cleanGlobal(sslConfig, false, null, null);
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
@@ -194,7 +198,7 @@ abstract class AbstractResetIntegrationTest {
 
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(null, sslConfig);
+        cleanGlobal(sslConfig, false, null, null);
     }
 
     void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
@@ -248,7 +252,7 @@ abstract class AbstractResetIntegrationTest {
         // RESET
         streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
-        cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
+        cleanGlobal(sslConfig, true, null, null);
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
@@ -263,8 +267,7 @@ abstract class AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC_2_RERUN,
-            40
-        );
+            40);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
@@ -272,11 +275,194 @@ abstract class AbstractResetIntegrationTest {
 
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
+        cleanGlobal(sslConfig, true, null, null);
 
         cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
     }
 
+    void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
+        final Properties sslConfig = getClientSslConfig();
+        final Properties streamsConfiguration = prepareTest();
+
+        final Properties resultTopicConsumerConfig = new Properties();
+        if (sslConfig != null) {
+            resultTopicConsumerConfig.putAll(sslConfig);
+        }
+        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
+            bootstrapServers,
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class));
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        final File resetFile = File.createTempFile("reset", ".csv");
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
+            writer.write(INPUT_TOPIC + ",0,1");
+            writer.close();
+        }
+
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.cleanUp();
+
+        cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath());
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        resetFile.deleteOnExit();
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            5);
+        streams.close();
+
+        result.remove(0);
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(sslConfig, false, null, null);
+    }
+
+    void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
+        final Properties sslConfig = getClientSslConfig();
+        final Properties streamsConfiguration = prepareTest();
+
+        final Properties resultTopicConsumerConfig = new Properties();
+        if (sslConfig != null) {
+            resultTopicConsumerConfig.putAll(sslConfig);
+        }
+        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
+            bootstrapServers,
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class));
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        final File resetFile = File.createTempFile("reset", ".csv");
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
+            writer.write(INPUT_TOPIC + ",0,1");
+            writer.close();
+        }
+
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.cleanUp();
+
+
+        final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+        final Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.DATE, -1);
+
+        cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime()));
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        resetFile.deleteOnExit();
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(sslConfig, false, null, null);
+    }
+
+    void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
+        final Properties sslConfig = getClientSslConfig();
+        final Properties streamsConfiguration = prepareTest();
+
+        final Properties resultTopicConsumerConfig = new Properties();
+        if (sslConfig != null) {
+            resultTopicConsumerConfig.putAll(sslConfig);
+        }
+        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
+            bootstrapServers,
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class));
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.start();
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        final File resetFile = File.createTempFile("reset", ".csv");
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
+            writer.write(INPUT_TOPIC + ",0,1");
+            writer.close();
+        }
+
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams.cleanUp();
+        cleanGlobal(sslConfig, false, "--by-duration", "PT1M");
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        resetFile.deleteOnExit();
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(sslConfig, false, null, null);
+    }
+
     private Properties prepareTest() throws IOException {
         Properties streamsConfiguration = getClientSslConfig();
         if (streamsConfiguration == null) {
@@ -302,6 +488,10 @@ abstract class AbstractResetIntegrationTest {
     private void prepareInputData() throws Exception {
         cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
 
+        add10InputElements();
+    }
+
+    private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
         Properties producerConfig = getClientSslConfig();
         if (producerConfig == null) {
             producerConfig = new Properties();
@@ -379,40 +569,39 @@ abstract class AbstractResetIntegrationTest {
         return builder.build();
     }
 
-    private void cleanGlobal(final String intermediateUserTopic, final Properties sslConfig) throws Exception {
+    private void cleanGlobal(final Properties sslConfig,
+                             final boolean withIntermediateTopics,
+                             final String resetScenario,
+                             final String resetScenarioArg) throws Exception {
         // leaving --zookeeper arg here to ensure tool works if users add it
-        final String[] parameters;
-        if (intermediateUserTopic != null) {
-            parameters = new String[]{
-                "--application-id", APP_ID + testNo,
+        final List<String> parameterList = new ArrayList<>(
+            Arrays.asList("--application-id", APP_ID + testNo,
                 "--bootstrap-servers", bootstrapServers,
-                "--input-topics", INPUT_TOPIC,
-                "--intermediate-topics", INTERMEDIATE_USER_TOPIC,
-                "--zookeeper", "localhost:2181"
-            };
-        } else {
-            if (sslConfig != null) {
-                final File configFile = TestUtils.tempFile();
-                final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
-                writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
-                writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
-                writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
-                writer.close();
-
-                parameters = new String[]{
-                    "--application-id", APP_ID + testNo,
-                    "--bootstrap-servers", bootstrapServers,
-                    "--input-topics", INPUT_TOPIC,
-                    "--config-file", configFile.getAbsolutePath()
-                };
-            } else {
-                parameters = new String[]{
-                    "--application-id", APP_ID + testNo,
-                    "--bootstrap-servers", bootstrapServers,
-                    "--input-topics", INPUT_TOPIC
-                };
-            }
+                "--input-topics", INPUT_TOPIC));
+        if (withIntermediateTopics) {
+            parameterList.add("--intermediate-topics");
+            parameterList.add(INTERMEDIATE_USER_TOPIC);
+        }
+        if (sslConfig != null) {
+            final File configFile = TestUtils.tempFile();
+            final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
+            writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
+            writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
+            writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
+            writer.close();
+
+            parameterList.add("--config-file");
+            parameterList.add(configFile.getAbsolutePath());
+        }
+        if (resetScenario != null) {
+            parameterList.add(resetScenario);
+        }
+        if (resetScenarioArg != null) {
+            parameterList.add(resetScenarioArg);
         }
+
+        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index a72bad4..d781d95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -35,6 +35,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
+
     static {
         final Properties props = new Properties();
         // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@@ -55,13 +56,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
         beforePrepareTest();
     }
 
+
+    @Test
+    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
+        super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
+    }
+
     @Test
     public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
         super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic();
     }
 
     @Test
-    public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
-        super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
+    public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
+        super.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic();
+    }
+
+    @Test
+    public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
+        super.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic();
+    }
+
+    @Test
+    public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
+        super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index e58b18c..abf4c38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -51,6 +51,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
+
     static {
         final Properties props = new Properties();
         // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
@@ -89,5 +90,4 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
     public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
         super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/30f08d15/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
new file mode 100644
index 0000000..c6b0f5f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tools;
+
+import kafka.tools.StreamsResetter;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class StreamsResetterTest {
+
+    private static final String TOPIC = "topic1";
+    private final StreamsResetter streamsResetter = new StreamsResetter();
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
+    private final Set<TopicPartition> inputTopicPartitions = new HashSet<>(Collections.singletonList(topicPartition));
+
+    @Before
+    public void setUp() {
+        consumer.assign(Collections.singletonList(topicPartition));
+
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0L, new byte[] {}, new byte[] {}));
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1L, new byte[] {}, new byte[] {}));
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 2L, new byte[] {}, new byte[] {}));
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 3L, new byte[] {}, new byte[] {}));
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 4L, new byte[] {}, new byte[] {}));
+    }
+
+    @Test
+    public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(3, records.count());
+    }
+
+    @Test
+    public void testResetToSpecificOffsetWhenBeforeBeginningOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 3L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testResetToSpecificOffsetWhenAfterEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 3L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testShiftOffsetByWhenBetweenBeginningAndEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testShiftOffsetByWhenBeforeBeginningOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(5, records.count());
+    }
+
+    @Test
+    public void testShiftOffsetByWhenAfterEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 3L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testResetUsingPlanWhenBetweenBeginningAndEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
+        topicPartitionsAndOffset.put(topicPartition, 3L);
+        streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testResetUsingPlanWhenBeforeBeginningOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 4L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 3L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
+        topicPartitionsAndOffset.put(topicPartition, 1L);
+        streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void testResetUsingPlanWhenAfterEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 3L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>();
+        topicPartitionsAndOffset.put(topicPartition, 5L);
+        streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void shouldSeekToEndOffset() {
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 3L);
+        consumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+
+        final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
+        intermediateTopicPartitions.add(topicPartition);
+        streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
+
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        assertEquals(2, records.count());
+    }
+
+    @Test
+    public void shouldDeleteTopic() {
+        Cluster cluster = createCluster(1);
+        try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNode(cluster.controller());
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().prepareResponse(new DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE)));
+            streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), env.adminClient());
+        }
+    }
+
+    private Cluster createCluster(int numNodes) {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        for (int i = 0; i != numNodes; ++i) {
+            nodes.put(i, new Node(i, "localhost", 8121 + i));
+        }
+        return new Cluster("mockClusterId", nodes.values(),
+            Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+            Collections.<String>emptySet(), nodes.get(0));
+    }
+
+    @Test
+    public void shouldAcceptValidDateFormats() throws ParseException {
+        //check valid formats
+        invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
+        invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+        invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
+        invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
+        invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
+    }
+
+    @Test
+    public void shouldThrowOnInvalidDateFormat() throws ParseException {
+        //check some invalid formats
+        try {
+            invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();
+        }
+
+        try {
+            invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"));
+            fail("Call to getDateTime should fail");
+        } catch (final Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException {
+        final Date checkpoint = new Date();
+        final StreamsResetter streamsResetter = new StreamsResetter();
+        final String formattedCheckpoint = format.format(checkpoint);
+        streamsResetter.getDateTime(formattedCheckpoint);
+    }
+}
\ No newline at end of file


Mime
View raw message