kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7446: Fix the duration and instant validation messages. (#5930)
Date Tue, 04 Dec 2018 07:00:09 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7283711  KAFKA-7446: Fix the duration and instant validation messages. (#5930)
7283711 is described below

commit 7283711c0d99d484b10dd7c460f7df01a319fc2b
Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com>
AuthorDate: Tue Dec 4 14:59:54 2018 +0800

    KAFKA-7446: Fix the duration and instant validation messages. (#5930)
    
    Changes made as part of this commit.
     - Improved error message for better readability at millis validation utility
     - Corrected java documentation on `AdvanceInterval` check.
     - Added caller specific prefix text to make error message more clear to developers/users.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Jacek Laskowski <jacek@japila.pl>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  4 ++-
 .../apache/kafka/streams/internals/ApiUtils.java   | 32 ++++++++++++++++------
 .../apache/kafka/streams/kstream/JoinWindows.java  | 13 ++++++---
 .../apache/kafka/streams/kstream/Materialized.java |  6 +++-
 .../kafka/streams/kstream/SessionWindows.java      |  8 ++++--
 .../apache/kafka/streams/kstream/TimeWindows.java  | 17 ++++++++----
 .../kafka/streams/kstream/UnlimitedWindows.java    |  5 +++-
 .../processor/internals/ProcessorContextImpl.java  |  5 +++-
 .../org/apache/kafka/streams/state/Stores.java     | 11 ++++++--
 .../apache/kafka/streams/state/WindowStore.java    | 14 ++++++----
 .../internals/CompositeReadOnlyWindowStore.java    | 14 ++++++----
 .../state/internals/ReadOnlyWindowStoreStub.java   | 14 ++++++----
 12 files changed, 97 insertions(+), 46 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c29b7bc..420c51f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -82,6 +82,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
 /**
  * A Kafka client that allows for performing continuous computation on input coming from
one or more input topics and
@@ -921,7 +922,8 @@ public class KafkaStreams implements AutoCloseable {
      * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code
long milliseconds}
      */
     public synchronized boolean close(final Duration timeout) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(timeout, "timeout");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
+        ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
 
         final long timeoutMs = timeout.toMillis();
         if (timeoutMs < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index e888d7a..dd3b691 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -18,43 +18,57 @@ package org.apache.kafka.streams.internals;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Objects;
+
+import static java.lang.String.format;
 
 public final class ApiUtils {
+
+    private static final String MILLISECOND_VALIDATION_FAIL_MSG_FRMT = "Invalid value for
parameter \"%s\" (value was: %s). ";
+
     private ApiUtils() {
     }
 
     /**
      * Validates that milliseconds from {@code duration} can be retrieved.
      * @param duration Duration to check.
-     * @param name Name of params for an error message.
+     * @param messagePrefix Prefix text for an error message.
      * @return Milliseconds from {@code duration}.
      */
-    public static long validateMillisecondDuration(final Duration duration, final String
name) {
+    public static long validateMillisecondDuration(final Duration duration, final String
messagePrefix) {
         try {
             if (duration == null)
-                throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't
be null.");
+                throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
 
             return duration.toMillis();
         } catch (final ArithmeticException e) {
-            throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds.
", e);
+            throw new IllegalArgumentException(messagePrefix + "It can't be converted to
milliseconds.", e);
         }
     }
 
     /**
      * Validates that milliseconds from {@code instant} can be retrieved.
      * @param instant Instant to check.
-     * @param name Name of params for an error message.
+     * @param messagePrefix Prefix text for an error message.
      * @return Milliseconds from {@code instant}.
      */
-    public static long validateMillisecondInstant(final Instant instant, final String name)
{
+    public static long validateMillisecondInstant(final Instant instant, final String messagePrefix)
{
         try {
             if (instant == null)
-                throw new IllegalArgumentException("[" + name + "] shouldn't be null.");
+                throw new IllegalArgumentException(messagePrefix + "It shouldn't be null.");
 
             return instant.toEpochMilli();
         } catch (final ArithmeticException e) {
-            throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds.
", e);
+            throw new IllegalArgumentException(messagePrefix + "It can't be converted to
milliseconds.", e);
         }
     }
+
+    /**
+     * Generates the prefix message for validateMillisecondXXXXXX() utility
+     * @param value Object to be converted to milliseconds
+     * @param name Object name
+     * @return Error message prefix to use in exception
+     */
+    public static String prepareMillisCheckFailMsgPrefix(final Object value, final String
name) {
+        return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value);
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 2087009..8256890 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -128,7 +129,8 @@ public final class JoinWindows extends Windows<Window> {
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be
represented as {@code long milliseconds}
      */
     public static JoinWindows of(final Duration timeDifference) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return of(timeDifference.toMillis());
     }
 
@@ -161,7 +163,8 @@ public final class JoinWindows extends Windows<Window> {
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return before(timeDifference.toMillis());
     }
 
@@ -194,7 +197,8 @@ public final class JoinWindows extends Windows<Window> {
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
     public JoinWindows after(final Duration timeDifference) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(timeDifference, "timeDifference");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
+        ApiUtils.validateMillisecondDuration(timeDifference, msgPrefix);
         return after(timeDifference.toMillis());
     }
 
@@ -226,7 +230,8 @@ public final class JoinWindows extends Windows<Window> {
      */
     @SuppressWarnings({"deprecation"}) // removing segments from Windows will fix this
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index a19412d..a0d6e34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Used to describe how a {@link StateStore} should be materialized.
  * You can either provide a custom {@link StateStore} backend through one of the provided
methods accepting a supplier
@@ -247,7 +249,9 @@ public class Materialized<K, V, S extends StateStore> {
      * @throws IllegalArgumentException if retention is negative or can't be represented
as {@code long milliseconds}
      */
     public Materialized<K, V, S> withRetention(final Duration retention) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(retention, "retention");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, "retention");
+        ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+
         if (retention.toMillis() < 0) {
             throw new IllegalArgumentException("Retention must not be negative.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 526c9d1..bdecd8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import java.time.Duration;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 
@@ -108,7 +109,8 @@ public final class SessionWindows {
      * @throws IllegalArgumentException if {@code inactivityGap} is zero or negative or can't
be represented as {@code long milliseconds}
      */
     public static SessionWindows with(final Duration inactivityGap) {
-        ApiUtils.validateMillisecondDuration(inactivityGap, "inactivityGap");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
+        ApiUtils.validateMillisecondDuration(inactivityGap, msgPrefix);
         return with(inactivityGap.toMillis());
     }
 
@@ -145,7 +147,9 @@ public final class SessionWindows {
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't
be represented as {@code long milliseconds}
      */
     public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 46485b1..942b54d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -125,7 +126,8 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * @throws IllegalArgumentException if the specified window size is zero or negative
or can't be represented as {@code long milliseconds}
      */
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondDuration(size, "size");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
+        ApiUtils.validateMillisecondDuration(size, msgPrefix);
         return of(size.toMillis());
     }
 
@@ -138,14 +140,15 @@ public final class TimeWindows extends Windows<TimeWindow> {
      *
      * @param advanceMs The advance interval ("hop") in milliseconds of the window, with
the requirement that {@code 0 < advanceMs <= sizeMs}.
      * @return a new window definition with default maintain duration of 1 day
-     * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal
the window size
+     * @throws IllegalArgumentException if the advance interval is negative, zero, or larger
than the window size
      * @deprecated Use {@link #advanceBy(Duration)} instead
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     @Deprecated
     public TimeWindows advanceBy(final long advanceMs) {
         if (advanceMs <= 0 || advanceMs > sizeMs) {
-            throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval
(0, %d].", sizeMs));
+            throw new IllegalArgumentException(String.format("Window advancement interval
should be more than zero " +
+                    "and less than window duration which is %d ms, but given advancement
interval is: %d ms", sizeMs, advanceMs));
         }
         return new TimeWindows(sizeMs, advanceMs, grace, maintainDurationMs, segments);
     }
@@ -159,11 +162,12 @@ public final class TimeWindows extends Windows<TimeWindow> {
      *
      * @param advance The advance interval ("hop") of the window, with the requirement that
{@code 0 < advance.toMillis() <= sizeMs}.
      * @return a new window definition with default maintain duration of 1 day
-     * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal
the window size
+     * @throws IllegalArgumentException if the advance interval is negative, zero, or larger
than the window size
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     public TimeWindows advanceBy(final Duration advance) {
-        ApiUtils.validateMillisecondDuration(advance, "advance");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, "advance");
+        ApiUtils.validateMillisecondDuration(advance, msgPrefix);
         return advanceBy(advance.toMillis());
     }
 
@@ -196,7 +200,8 @@ public final class TimeWindows extends Windows<TimeWindow> {
      */
     @SuppressWarnings("deprecation") // will be fixed when we remove segments from Windows
     public TimeWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(afterWindowEnd, "afterWindowEnd");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
+        ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEnd.toMillis() < 0) {
             throw new IllegalArgumentException("Grace period must not be negative.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 46d7270..0a45d81 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * The unlimited window specifications used for aggregations.
  * <p>
@@ -82,7 +84,8 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow>
{
      * @throws IllegalArgumentException if the start time is negative or can't be represented
as {@code long milliseconds}
      */
     public UnlimitedWindows startOn(final Instant start) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondInstant(start, "start");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(start, "start");
+        ApiUtils.validateMillisecondInstant(start, msgPrefix);
         return startOn(start.toEpochMilli());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 7c18117..913e34e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -31,6 +31,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.time.Duration;
 import java.util.List;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier
{
 
     private final StreamTask task;
@@ -164,7 +166,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws IllegalArgumentException
{
-        ApiUtils.validateMillisecondDuration(interval, "interval");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
+        ApiUtils.validateMillisecondDuration(interval, msgPrefix);
         return schedule(interval.toMillis(), type, callback);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index f7a1824..7991b0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -34,6 +34,8 @@ import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import java.time.Duration;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Factory for creating state stores in Kafka Streams.
  * <p>
@@ -195,8 +197,10 @@ public class Stores {
                                                                  final Duration windowSize,
                                                                  final boolean retainDuplicates)
throws IllegalArgumentException {
         Objects.requireNonNull(name, "name cannot be null");
-        ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
-        ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
+        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
 
         final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);
 
@@ -259,7 +263,8 @@ public class Stores {
     @SuppressWarnings("deprecation")
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                                                                    final Duration retentionPeriod)
{
-        ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
         return persistentSessionStore(name, retentionPeriod.toMillis());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 57792b6..cf5744b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.StateStore;
 
 import java.time.Instant;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * A windowed store interface extending {@link StateStore}.
  *
@@ -93,8 +95,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V>
 
     @Override
     default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant
to) {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -116,8 +118,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V>
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final Instant fromTime, final Instant toTime) {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime,
"fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime,
"toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 
@@ -135,8 +137,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V>
 
     @Override
     default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final
Instant to) {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index 84b589d..fa37f5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
  * org.apache.kafka.streams.processor.internals.ProcessorTopology}
@@ -89,8 +91,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant
to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -113,8 +115,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final
Instant fromTime, final Instant toTime) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime,
"fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime,
"toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 
@@ -149,8 +151,8 @@ public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index e2757a9..5f18be9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -37,6 +37,8 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
 /**
  * A very simple window store stub for testing purposes.
  */
@@ -78,8 +80,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K,
V>,
 
     @Override
     public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant
to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetch(key, from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -177,8 +179,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K,
V>,
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final
Instant to) throws IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(from, "from");
-        ApiUtils.validateMillisecondInstant(to, "to");
+        ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
+        ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
         return fetchAll(from.toEpochMilli(), to.toEpochMilli());
     }
 
@@ -232,8 +234,8 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K,
V>,
                                                             final K to,
                                                             final Instant fromTime,
                                                             final Instant toTime) throws
IllegalArgumentException {
-        ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
-        ApiUtils.validateMillisecondInstant(toTime, "toTime");
+        ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime,
"fromTime"));
+        ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime,
"toTime"));
         return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
     }
 


Mime
View raw message