This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 808dc0a MINOR: Update docs with out-dated context.schedule(...) examples (#5924) 808dc0a is described below commit 808dc0a96b3a8e6a6dbecb0473382b39a366d7a2 Author: cadonna AuthorDate: Wed Nov 21 18:48:01 2018 +0100 MINOR: Update docs with out-dated context.schedule(...) examples (#5924) Reviewers: Matthias J. Sax , Bill Bejeck --- docs/streams/developer-guide/testing.html | 4 ++-- .../main/java/org/apache/kafka/streams/kstream/KStream.java | 10 +++++----- .../src/main/java/org/apache/kafka/streams/kstream/KTable.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index bdecc43..026b02b 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -255,8 +255,8 @@ public class CustomMaxAggregator implements Processor<String, Long> { @Override public void init(ProcessorContext context) { this.context = context; - context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); - context.schedule(10000, PunctuationType.STREAM_TIME, time -> flushStore()); + context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); + context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore()); store = (KeyValueStore<String, Long>) context.getStateStore("aggStore"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6055199..6d83340 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -365,7 +365,7 @@ public interface KStream { * @param printed options for printing */ void print(final Printed printed); - + /** * Perform an action on each record of {@code KStream}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}). @@ -530,7 +530,7 @@ public interface KStream { * this.state = context.getStateStore("myTransformState"); * // punctuate each 1000ms; can access this.state * // can emit as many new KeyValue pairs as required via this.context#forward() - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * KeyValue transform(K key, V value) { @@ -602,7 +602,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(V value) { @@ -671,7 +671,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) { @@ -737,7 +737,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myProcessorState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * void process(K key, V value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 5ed0270..3223b75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -440,7 +440,7 @@ public interface KTable { * * void init(ProcessorContext context) { * this.state = (KeyValueStore)context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) { @@ -515,7 +515,7 @@ public interface KTable { * * void init(ProcessorContext context) { * this.state = (KeyValueStore)context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) {