From commits-return-10765-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Wed Nov 21 17:48:14 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B39AC182DF for ; Wed, 21 Nov 2018 17:48:14 +0000 (UTC) Received: (qmail 76145 invoked by uid 500); 21 Nov 2018 17:48:14 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 76101 invoked by uid 500); 21 Nov 2018 17:48:14 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 76092 invoked by uid 99); 21 Nov 2018 17:48:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Nov 2018 17:48:14 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D4DC1854F3; Wed, 21 Nov 2018 17:48:13 +0000 (UTC) Date: Wed, 21 Nov 2018 17:48:13 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Update docs with out-dated context.schedule(...) examples (#5924) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154282249218.2637.13835441081283508036@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 068ab9cefae301f3187ea885d645c425955e77d2 X-Git-Newrev: 808dc0a96b3a8e6a6dbecb0473382b39a366d7a2 X-Git-Rev: 808dc0a96b3a8e6a6dbecb0473382b39a366d7a2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 808dc0a MINOR: Update docs with out-dated context.schedule(...) examples (#5924) 808dc0a is described below commit 808dc0a96b3a8e6a6dbecb0473382b39a366d7a2 Author: cadonna AuthorDate: Wed Nov 21 18:48:01 2018 +0100 MINOR: Update docs with out-dated context.schedule(...) examples (#5924) Reviewers: Matthias J. Sax , Bill Bejeck --- docs/streams/developer-guide/testing.html | 4 ++-- .../main/java/org/apache/kafka/streams/kstream/KStream.java | 10 +++++----- .../src/main/java/org/apache/kafka/streams/kstream/KTable.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index bdecc43..026b02b 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.html @@ -255,8 +255,8 @@ public class CustomMaxAggregator implements Processor<String, Long> { @Override public void init(ProcessorContext context) { this.context = context; - context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); - context.schedule(10000, PunctuationType.STREAM_TIME, time -> flushStore()); + context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore()); + context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore()); store = (KeyValueStore<String, Long>) context.getStateStore("aggStore"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6055199..6d83340 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -365,7 +365,7 @@ public interface KStream { * @param printed options for printing */ void print(final Printed printed); - + /** * Perform an action on each record of {@code KStream}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}). @@ -530,7 +530,7 @@ public interface KStream { * this.state = context.getStateStore("myTransformState"); * // punctuate each 1000ms; can access this.state * // can emit as many new KeyValue pairs as required via this.context#forward() - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * KeyValue transform(K key, V value) { @@ -602,7 +602,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(V value) { @@ -671,7 +671,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) { @@ -737,7 +737,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myProcessorState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * void process(K key, V value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 5ed0270..3223b75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -440,7 +440,7 @@ public interface KTable { * * void init(ProcessorContext context) { * this.state = (KeyValueStore)context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) { @@ -515,7 +515,7 @@ public interface KTable { * * void init(ProcessorContext context) { * this.state = (KeyValueStore)context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(K readOnlyKey, V value) {