kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.1 updated: MINOR: Update docs with out-dated context.schedule(...) examples (#5924)
Date Wed, 21 Nov 2018 17:49:18 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c2bfdda  MINOR: Update docs with out-dated context.schedule(...) examples (#5924)
c2bfdda is described below

commit c2bfddaa59535b7253b5c7d4823b4b07f2254d30
Author: cadonna <cadonna@users.noreply.github.com>
AuthorDate: Wed Nov 21 18:48:01 2018 +0100

    MINOR: Update docs with out-dated context.schedule(...) examples (#5924)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 docs/streams/developer-guide/testing.html                      |  4 ++--
 .../main/java/org/apache/kafka/streams/kstream/KStream.java    | 10 +++++-----
 .../src/main/java/org/apache/kafka/streams/kstream/KTable.java |  4 ++--
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index bdecc43..026b02b 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -255,8 +255,8 @@ public class CustomMaxAggregator implements Processor&lt;String, Long&gt;
{
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
-        context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, time -&gt; flushStore());
-        context.schedule(10000, PunctuationType.STREAM_TIME, time -&gt; flushStore());
+        context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -&gt;
flushStore());
+        context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -&gt;
flushStore());
         store = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("aggStore");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 77987a9..4d278b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -365,7 +365,7 @@ public interface KStream<K, V> {
      * @param printed options for printing
      */
     void print(final Printed<K, V> printed);
-    
+
     /**
      * Perform an action on each record of {@code KStream}.
      * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier,
String...)}).
@@ -530,7 +530,7 @@ public interface KStream<K, V> {
      *                 this.state = context.getStateStore("myTransformState");
      *                 // punctuate each 1000ms; can access this.state
      *                 // can emit as many new KeyValue pairs as required via this.context#forward()
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..));
      *             }
      *
      *             KeyValue transform(K key, V value) {
@@ -602,7 +602,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(V value) {
@@ -671,7 +671,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(K readOnlyKey, V value) {
@@ -737,7 +737,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myProcessorState");
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
      *             }
      *
      *             void process(K key, V value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b49f3e4..f5bf3a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -440,7 +440,7 @@ public interface KTable<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(K readOnlyKey, V value) {
@@ -515,7 +515,7 @@ public interface KTable<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(..)); // punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(K readOnlyKey, V value) {


Mime
View raw message