kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5233; KIP-138: Change punctuate semantics
Date Wed, 28 Jun 2017 10:26:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a293e1dc0 -> efb060c57


KAFKA-5233; KIP-138: Change punctuate semantics

Implementation for KIP-138: Change punctuate semantics

Author: Michal Borowiecki <michal.borowiecki@openbet.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #3055 from mihbor/KIP-138


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/efb060c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/efb060c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/efb060c5

Branch: refs/heads/trunk
Commit: efb060c57f05d1d586bb14c016b0187c60f8e994
Parents: a293e1d
Author: Michal Borowiecki <michal.borowiecki@openbet.com>
Authored: Wed Jun 28 11:26:02 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Jun 28 11:26:02 2017 +0100

----------------------------------------------------------------------
 docs/streams.html                               |  15 +-
 .../wordcount/WordCountProcessorDemo.java       |  36 ++--
 .../kafka/streams/kstream/Transformer.java      |  13 +-
 .../kafka/streams/kstream/ValueTransformer.java |  13 +-
 .../internals/KStreamTransformValues.java       |   8 +
 .../kafka/streams/processor/Cancellable.java    |  23 +++
 .../kafka/streams/processor/Processor.java      |   8 +-
 .../streams/processor/ProcessorContext.java     |  28 ++-
 .../streams/processor/PunctuationType.java      |  34 ++++
 .../kafka/streams/processor/Punctuator.java     |  26 +++
 .../internals/GlobalProcessorContextImpl.java   |  29 ++-
 .../internals/ProcessorContextImpl.java         |  16 +-
 .../processor/internals/ProcessorNode.java      |  18 +-
 .../internals/ProcessorNodePunctuator.java      |  26 +++
 .../processor/internals/PunctuationQueue.java   |  18 +-
 .../internals/PunctuationSchedule.java          |  55 +++++-
 .../streams/processor/internals/Punctuator.java |  23 ---
 .../processor/internals/StandbyContextImpl.java |  12 ++
 .../streams/processor/internals/StreamTask.java |  48 +++--
 .../processor/internals/StreamThread.java       |  38 +++-
 .../internals/AbstractProcessorContextTest.java |   8 +
 .../internals/PunctuationQueueTest.java         |  33 ++--
 .../processor/internals/StreamTaskTest.java     | 191 +++++++++++++++----
 .../apache/kafka/test/MockProcessorContext.java |   7 +
 .../apache/kafka/test/MockProcessorNode.java    |  12 +-
 .../kafka/test/MockProcessorSupplier.java       |  50 +++--
 .../apache/kafka/test/NoOpProcessorContext.java |   8 +-
 27 files changed, 633 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
index 625736e..f302add 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -319,13 +319,12 @@
         </p>
 
         <p>
-        The <code>Processor</code> interface provides two main API methods:
-        <code>process</code> and <code>punctuate</code>. The <code>process</code> method is performed on each
-        of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
-        In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
-        <code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
-        forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
-        processing progress (<code>context().commit</code>), etc.
+        The <code>Processor</code> interface provides one main API method, the <code>process</code> method,
+        which is performed on each of the received records.
+        In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the <code>init</code> method
+        and use the context to schedule a periodically called punctuation function (<code>context().schedule</code>),
+        to forward the modified / new key-value pair to downstream processors (<code>context().forward</code>),
+        to commit the current processing progress (<code>context().commit</code>), etc.
         </p>
 
         <p>
@@ -344,7 +343,7 @@ public class MyProcessor implements Processor&lt;String, String&gt; {
         this.context = context;
 
         // call this processor's punctuate() method every 1000 milliseconds.
-        this.context.schedule(1000);
+        this.context.schedule(1000, PunctuationType.STREAM_TIME, this::punctuate);
 
         // retrieve the key-value store named "Counts"
         this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 4a990a6..eceddf0 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -56,9 +58,24 @@ public class WordCountProcessorDemo {
 
                 @Override
                 @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
+                public void init(final ProcessorContext context) {
                     this.context = context;
-                    this.context.schedule(1000);
+                    this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() {
+                        @Override
+                        public void punctuate(long timestamp) {
+                            try (KeyValueIterator<String, Integer> iter = kvStore.all()) {
+                                System.out.println("----------- " + timestamp + " ----------- ");
+
+                                while (iter.hasNext()) {
+                                    KeyValue<String, Integer> entry = iter.next();
+
+                                    System.out.println("[" + entry.key + ", " + entry.value + "]");
+
+                                    context.forward(entry.key, entry.value.toString());
+                                }
+                            }
+                        }
+                    });
                     this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
                 }
 
@@ -80,19 +97,8 @@ public class WordCountProcessorDemo {
                 }
 
                 @Override
-                public void punctuate(long timestamp) {
-                    try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
-                        System.out.println("----------- " + timestamp + " ----------- ");
-
-                        while (iter.hasNext()) {
-                            KeyValue<String, Integer> entry = iter.next();
-
-                            System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                            context.forward(entry.key, entry.value.toString());
-                        }
-                    }
-                }
+                @Deprecated
+                public void punctuate(long timestamp) {}
 
                 @Override
                 public void close() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 7265a11..2eb4921 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
@@ -27,8 +29,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
  * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for
  * each record of a stream and can access and modify a state that is available beyond a single call of
  * {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation).
- * Additionally, the interface can be called in regular intervals based on the processing progress
- * (cf. {@link #punctuate(long)}.
+ * Additionally, this {@code Transformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule}
+ * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context.
  * <p>
  * Use {@link TransformerSupplier} to provide new instances of {@code Transformer} to Kafka Stream's runtime.
  * <p>
@@ -51,8 +53,8 @@ public interface Transformer<K, V, R> {
      * This is called once per instance when the topology gets initialized.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
-     * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and
-     * to access attached {@link StateStore}s.
+     * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
+     * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
      * <p>
      * Note, that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}.
@@ -92,9 +94,12 @@ public interface Transformer<K, V, R> {
      * timestamps return by the used {@link TimestampExtractor})
      * and not based on wall-clock time.
      *
+     * @deprecated Please use {@link Punctuator} functional interface instead.
+     *
      * @param timestamp the stream time when {@code punctuate} is being called
      * @return new {@link KeyValue} pair to be forwarded to down stream&mdash;if {@code null} will not be forwarded
      */
+    @Deprecated
     R punctuate(final long timestamp);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 0936e7a..5463a76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
@@ -27,8 +29,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
  * This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each
  * record of a stream and can access and modify a state that is available beyond a single call of
  * {@link #transform(Object)} (cf. {@link ValueMapper} for stateless value transformation).
- * Additionally, the interface can be called in regular intervals based on the processing progress
- * (cf. {@link #punctuate(long)}.
+ * Additionally, this {@code ValueTransformer} can {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule}
+ * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context.
  * If {@code ValueTransformer} is applied to a {@link KeyValue} pair record the record's key is preserved.
  * <p>
  * Use {@link ValueTransformerSupplier} to provide new instances of {@code ValueTransformer} to Kafka Stream's runtime.
@@ -48,8 +50,8 @@ public interface ValueTransformer<V, VR> {
      * This is called once per instance when the topology gets initialized.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
-     * {@link ProcessorContext#schedule(long) schedule itself} for periodical calls (cf. {@link #punctuate(long)}), and
-     * to access attached {@link StateStore}s.
+     * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
+     * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
      * <p>
      * Note that {@link ProcessorContext} is updated in the background with the current record's meta data.
      * Thus, it only contains valid record meta data when accessed within {@link #transform(Object)}.
@@ -93,9 +95,12 @@ public interface ValueTransformer<V, VR> {
      * timestamps return by the used {@link TimestampExtractor})
      * and not based on wall-clock time.
      *
+     * @deprecated Please use {@link Punctuator} functional interface instead.
+     *
      * @param timestamp the stream time when {@code punctuate} is being called
      * @return must return {@code null}&mdash;otherwise, an {@link StreamsException exception} will be thrown
      */
+    @Deprecated
     VR punctuate(final long timestamp);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 2e3211c..a6e9aaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -21,9 +21,12 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -98,6 +101,11 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                     }
 
                     @Override
+                    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+                        return context.schedule(interval, type, callback);
+                    }
+
+                    @Override
                     public void schedule(final long interval) {
                         context.schedule(interval);
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
new file mode 100644
index 0000000..82c9edd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Cancellable.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+public interface Cancellable {
+
+    void cancel();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index 2aaf45e..2ed17df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -31,8 +31,9 @@ public interface Processor<K, V> {
      * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
      * that contains it is initialized.
      * <p>
-     * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
+     * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to
+     * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) schedule} a method to be
+     * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s.
      * 
      * @param context the context; may not be null
      */
@@ -49,9 +50,12 @@ public interface Processor<K, V> {
     /**
      * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
      * during {@link #init(ProcessorContext) initialization}.
+     *
+     * @deprecated Please use {@link Punctuator} functional interface instead.
      * 
      * @param timestamp the stream time when this method is being called
      */
+    @Deprecated
     void punctuate(long timestamp);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 559e9f7..3468f1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -88,11 +88,37 @@ public interface ProcessorContext {
 
     /**
      * Schedules a periodic operation for processors. A processor may call this method during
+     * {@link Processor#init(ProcessorContext) initialization} or
+     * {@link Processor#process(Object, Object) processing} to
+     * schedule a periodic callback - called a punctuation - to {@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+     *   <b>NOTE:</b> Only advanced if messages arrive</li>
+     *   <li>{@link PunctuationType#SYSTEM_TIME} - uses system time (the wall-clock time),
+     *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
+     *   independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
+     *   by how long an iteration of the processing loop takes to complete</li>
+     * </ul>
+     *
+     * @param interval the time interval between punctuations
+     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#SYSTEM_TIME}
+     * @param callback a function consuming timestamps representing the current stream or system time
+     * @return a handle allowing cancellation of the punctuation schedule established by this method
+     */
+    Cancellable schedule(long interval, PunctuationType type, Punctuator callback);
+
+    /**
+     * Schedules a periodic operation for processors. A processor may call this method during
      * {@link Processor#init(ProcessorContext) initialization} to
-     * schedule a periodic call called a punctuation to {@link Processor#punctuate(long)}.
+     * schedule a periodic call - called a punctuation - to {@link Processor#punctuate(long)}.
+     *
+     * @deprecated Please use {@link #schedule(long, PunctuationType, Punctuator)} instead.
      *
      * @param interval the time interval between punctuations
      */
+    @Deprecated
     void schedule(long interval);
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
new file mode 100644
index 0000000..4dd9300
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Controls what notion of time is used for punctuation scheduled via {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)} schedule}:
+ * <ul>
+ *   <li>STREAM_TIME - uses "stream time", which is advanced by the processing of messages
+ *   in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
+ *   <b>NOTE:</b> Only advanced if messages arrive</li>
+ *   <li>SYSTEM_TIME - uses system time (the wall-clock time),
+ *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
+ *   independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
+ *   by how long an iteration of the processing loop takes to complete</li>
+ * </ul>
+ */
+public enum PunctuationType {
+   STREAM_TIME,
+   SYSTEM_TIME,
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
new file mode 100644
index 0000000..200c1af
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * A functional interface used as an argument to {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)}.
+ */
+public interface Punctuator {
+
+    void punctuate(long timestamp);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 34d0c35..4c1d350 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -57,14 +60,22 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         }
     }
 
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public <K, V> void forward(K key, V value, String childName) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");
     }
 
     @Override
@@ -72,9 +83,21 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         //no-op
     }
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @Override
+    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
+    }
+
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 55cddcc..79c38b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -122,8 +125,19 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     }
 
     @Override
+    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+        return task.schedule(interval, type, callback);
+    }
+
+    @Override
+    @Deprecated
     public void schedule(final long interval) {
-        task.schedule(interval);
+        schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() {
+            @Override
+            public void punctuate(final long timestamp) {
+                currentNode().processor().punctuate(timestamp);
+            }
+        });
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8112614..47f6311 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -65,14 +66,6 @@ public class ProcessorNode<K, V> {
         }
     };
 
-    private long timestamp;
-    private Runnable punctuateDelegate = new Runnable() {
-        @Override
-        public void run() {
-            processor().punctuate(timestamp);
-        }
-    };
-
     public final Set<String> stateStores;
 
     public ProcessorNode(String name) {
@@ -133,8 +126,13 @@ public class ProcessorNode<K, V> {
         this.nodeMetrics.metrics.measureLatencyNs(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
     }
 
-    public void punctuate(long timestamp) {
-        this.timestamp = timestamp;
+    public void punctuate(final long timestamp, final Punctuator punctuator) {
+        Runnable punctuateDelegate = new Runnable() {
+            @Override
+            public void run() {
+                punctuator.punctuate(timestamp);
+            }
+        };
         this.nodeMetrics.metrics.measureLatencyNs(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
new file mode 100644
index 0000000..c80a3e8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNodePunctuator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+
+public interface ProcessorNodePunctuator {
+
+    void punctuate(ProcessorNode node, long streamTime, PunctuationType type, Punctuator punctuator);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index 0f51852..ec047e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -16,16 +16,20 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+
 import java.util.PriorityQueue;
 
 public class PunctuationQueue {
 
     private final PriorityQueue<PunctuationSchedule> pq = new PriorityQueue<>();
 
-    public void schedule(PunctuationSchedule sched) {
+    public Cancellable schedule(PunctuationSchedule sched) {
         synchronized (pq) {
             pq.add(sched);
         }
+        return sched.cancellable();
     }
 
     public void close() {
@@ -34,16 +38,20 @@ public class PunctuationQueue {
         }
     }
 
-    public boolean mayPunctuate(long timestamp, Punctuator punctuator) {
+    public boolean mayPunctuate(final long timestamp, final PunctuationType type, final ProcessorNodePunctuator processorNodePunctuator) {
         synchronized (pq) {
             boolean punctuated = false;
             PunctuationSchedule top = pq.peek();
             while (top != null && top.timestamp <= timestamp) {
                 PunctuationSchedule sched = top;
                 pq.poll();
-                punctuator.punctuate(sched.node(), timestamp);
-                pq.add(sched.next(timestamp));
-                punctuated = true;
+
+                if (!sched.isCancelled()) {
+                    processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator());
+                    pq.add(sched.next(timestamp));
+                    punctuated = true;
+                }
+
 
                 top = pq.peek();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index be792ba..cf50005 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -16,30 +16,73 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Punctuator;
+
 public class PunctuationSchedule extends Stamped<ProcessorNode> {
 
-    final long interval;
+    private final long interval;
+    private final Punctuator punctuator;
+    private boolean isCancelled = false;
+    // this Cancellable will be re-pointed at the successor schedule in next()
+    private final RepointableCancellable cancellable;
 
-    public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, 0L, interval);
+    PunctuationSchedule(ProcessorNode node, long interval, Punctuator punctuator) {
+        this(node, 0L, interval, punctuator, new RepointableCancellable());
+        cancellable.setSchedule(this);
     }
 
-    public PunctuationSchedule(ProcessorNode node, long time, long interval) {
+    private PunctuationSchedule(ProcessorNode node, long time, long interval, Punctuator punctuator, RepointableCancellable cancellable) {
         super(node, time);
         this.interval = interval;
+        this.punctuator = punctuator;
+        this.cancellable = cancellable;
     }
 
     public ProcessorNode node() {
         return value;
     }
 
+    public Punctuator punctuator() {
+        return punctuator;
+    }
+
+    public Cancellable cancellable() {
+        return cancellable;
+    }
+
+    void markCancelled() {
+        isCancelled = true;
+    }
+
+    boolean isCancelled() {
+        return isCancelled;
+    }
+
     public PunctuationSchedule next(long currTimestamp) {
+        PunctuationSchedule nextSchedule;
         // we need to special handle the case when it is firstly triggered (i.e. the timestamp
         // is equal to the interval) by reschedule based on the currTimestamp
         if (timestamp == 0L)
-            return new PunctuationSchedule(value, currTimestamp + interval, interval);
+            nextSchedule = new PunctuationSchedule(value, currTimestamp + interval, interval, punctuator, cancellable);
         else
-            return new PunctuationSchedule(value, timestamp + interval, interval);
+            nextSchedule = new PunctuationSchedule(value, timestamp + interval, interval, punctuator, cancellable);
+
+        cancellable.setSchedule(nextSchedule);
+
+        return nextSchedule;
     }
 
+    private static class RepointableCancellable implements Cancellable {
+        private PunctuationSchedule schedule;
+
+        synchronized void setSchedule(PunctuationSchedule schedule) {
+            this.schedule = schedule;
+        }
+
+        @Override
+        synchronized public void cancel() {
+            schedule.markCancelled();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
deleted file mode 100644
index 4bac97d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-public interface Punctuator {
-
-    void punctuate(ProcessorNode node, long streamTime);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 0791c67..812a4ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
@@ -156,6 +159,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
+    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
+    }
+
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
+    @Override
+    @Deprecated
     public void schedule(final long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 55e1ffe..dfb28f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -29,7 +29,10 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -46,7 +49,7 @@ import static java.util.Collections.singleton;
 /**
  * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
  */
-public class StreamTask extends AbstractTask implements Punctuator {
+public class StreamTask extends AbstractTask implements ProcessorNodePunctuator {
 
     private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
 
@@ -54,7 +57,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
-    private final PunctuationQueue punctuationQueue;
+    private final PunctuationQueue streamTimePunctuationQueue;
+    private final PunctuationQueue systemTimePunctuationQueue;
 
     private final Map<TopicPartition, Long> consumedOffsets;
     private final RecordCollector recordCollector;
@@ -109,7 +113,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       final Time time,
                       final Producer<byte[], byte[]> producer) {
         super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config);
-        punctuationQueue = new PunctuationQueue();
+        streamTimePunctuationQueue = new PunctuationQueue();
+        systemTimePunctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         this.metrics = new TaskMetrics(metrics);
 
@@ -219,7 +224,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @throws IllegalStateException if the current node is not null
      */
     @Override
-    public void punctuate(final ProcessorNode node, final long timestamp) {
+    public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() != null) {
             throw new IllegalStateException(String.format("%s Current node is not null", logPrefix));
         }
@@ -227,11 +232,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
         updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
 
         if (log.isTraceEnabled()) {
-            log.trace("{} Punctuating processor {} with timestamp {}", logPrefix, node.name(), timestamp);
+            log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type);
         }
 
         try {
-            node.punctuate(timestamp);
+            node.punctuate(timestamp, punctuator);
         } catch (final KafkaException e) {
             throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix,  node.name()), e);
         } finally {
@@ -484,14 +489,24 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * Schedules a punctuation for the processor
      *
      * @param interval  the interval in milliseconds
+     * @param type
      * @throws IllegalStateException if the current node is not null
      */
-    public void schedule(final long interval) {
+    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() == null) {
             throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
         }
 
-        punctuationQueue.schedule(new PunctuationSchedule(processorContext.currentNode(), interval));
+        final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
+
+        switch (type) {
+            case STREAM_TIME:
+                return streamTimePunctuationQueue.schedule(schedule);
+            case SYSTEM_TIME:
+                return systemTimePunctuationQueue.schedule(schedule);
+            default:
+                throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
+        }
     }
 
     /**
@@ -502,10 +517,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     /**
-     * Possibly trigger registered punctuation functions if
+     * Possibly trigger registered stream-time punctuation functions if
      * current partition group timestamp has reached the defined stamp
+     * Note, this is only called in the presence of new records
      */
-    boolean maybePunctuate() {
+    boolean maybePunctuateStreamTime() {
         final long timestamp = partitionGroup.timestamp();
 
         // if the timestamp is not known yet, meaning there is not enough data accumulated
@@ -513,11 +529,21 @@ public class StreamTask extends AbstractTask implements Punctuator {
         if (timestamp == TimestampTracker.NOT_KNOWN) {
             return false;
         } else {
-            return punctuationQueue.mayPunctuate(timestamp, this);
+            return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this);
         }
     }
 
     /**
+     * Possibly trigger registered system-time punctuation functions if
+     * current system timestamp has reached the defined stamp
+     * Note, this is called irrespective of the presence of new records
+     */
+    boolean maybePunctuateSystemTime() {
+        final long timestamp = time.milliseconds();
+
+        return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME, this);
+    }
+    /**
      * Request committing the current task's state
      */
     void needCommit() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3fd7832..ae344b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -546,7 +546,7 @@ public class StreamThread extends Thread {
             if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
                 streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
                 addRecordsToTasks(records);
-                final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit);
+                final long totalProcessed = processAndPunctuateStreamTime(activeTasks, recordsProcessedBeforeCommit);
                 if (totalProcessed > 0) {
                     final long processLatency = computeLatency();
                     streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
@@ -556,6 +556,7 @@ public class StreamThread extends Thread {
                 }
             }
 
+            maybePunctuateSystemTime();
             maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks(timerStartedMs);
             maybeClean(timerStartedMs);
@@ -653,8 +654,8 @@ public class StreamThread extends Thread {
      *                                     if UNLIMITED_RECORDS, then commit is never called
      * @return Number of records processed since last commit.
      */
-    private long processAndPunctuate(final Map<TaskId, StreamTask> tasks,
-                                     final long recordsProcessedBeforeCommit) {
+    private long processAndPunctuateStreamTime(final Map<TaskId, StreamTask> tasks,
+                                               final long recordsProcessedBeforeCommit) {
 
         long totalProcessedEachRound;
         long totalProcessedSinceLastMaybeCommit = 0;
@@ -699,7 +700,7 @@ public class StreamThread extends Thread {
             @Override
             public void apply(final StreamTask task) {
                 name = "punctuate";
-                maybePunctuate(task);
+                maybePunctuateStreamTime(task);
                 if (task.commitNeeded()) {
                     name = "commit";
 
@@ -721,11 +722,11 @@ public class StreamThread extends Thread {
         return totalProcessedSinceLastMaybeCommit;
     }
 
-    private void maybePunctuate(final StreamTask task) {
+    private void maybePunctuateStreamTime(final StreamTask task) {
         try {
             // check whether we should punctuate based on the task's partition group timestamp;
             // which are essentially based on record timestamp.
-            if (task.maybePunctuate()) {
+            if (task.maybePunctuateStreamTime()) {
                 streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs);
             }
         } catch (final KafkaException e) {
@@ -734,6 +735,31 @@ public class StreamThread extends Thread {
         }
     }
 
+    private void maybePunctuateSystemTime() {
+        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+            @Override
+            public String name() {
+                return "punctuate";
+            }
+
+            @Override
+            public void apply(final StreamTask task) {
+                try {
+                    // check whether we should punctuate based on system timestamp
+                    if (task.maybePunctuateSystemTime()) {
+                        streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs);
+                    }
+                } catch (final KafkaException e) {
+                    log.error("{} Failed to punctuate active task {}: {}", logPrefix, task.id(), e);
+                    throw e;
+                }
+            }
+        });
+        if (e != null) {
+            throw e;
+        }
+    }
+
     /**
      * Adjust the number of records that should be processed by scheduler. This avoids
      * scenarios where the processing time is higher than the commit time.

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 6adaa42..e0eed76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -145,6 +148,11 @@ public class AbstractProcessorContextTest {
         }
 
         @Override
+        public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+            return null;
+        }
+
+        @Override
         public void schedule(final long interval) {
 
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index a23ff75..1570c9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -28,34 +30,41 @@ public class PunctuationQueueTest {
 
     @Test
     public void testPunctuationInterval() {
-        TestProcessor processor = new TestProcessor();
-        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
-        PunctuationQueue queue = new PunctuationQueue();
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
 
-        PunctuationSchedule sched = new PunctuationSchedule(node, 100L);
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 100L, punctuator);
         final long now = sched.timestamp - 100L;
 
         queue.schedule(sched);
 
-        Punctuator punctuator = new Punctuator() {
-            public void punctuate(ProcessorNode node, long time) {
-                node.processor().punctuate(time);
+        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
+                punctuator.punctuate(time);
             }
         };
 
-        queue.mayPunctuate(now, punctuator);
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(0, processor.punctuatedAt.size());
 
-        queue.mayPunctuate(now + 99L, punctuator);
+        queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(0, processor.punctuatedAt.size());
 
-        queue.mayPunctuate(now + 100L, punctuator);
+        queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(1, processor.punctuatedAt.size());
 
-        queue.mayPunctuate(now + 199L, punctuator);
+        queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(1, processor.punctuatedAt.size());
 
-        queue.mayPunctuate(now + 200L, punctuator);
+        queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
         assertEquals(2, processor.punctuatedAt.size());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 130783d..1a0bebe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -38,7 +38,10 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
@@ -86,10 +89,11 @@ public class StreamTaskTest {
 
     private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer);
-    private final MockProcessorNode<Integer, Integer>  processor = new MockProcessorNode<>(10L);
+    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
+    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.SYSTEM_TIME);
 
     private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.<ProcessorNode>asList(source1, source2, processor),
+            Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime),
             new HashMap<String, SourceNode>() {
                 {
                     put(topic1[0], source1);
@@ -117,6 +121,14 @@ public class StreamTaskTest {
     private StreamsConfig config;
     private StreamsConfig eosConfig;
     private StreamTask task;
+    private long punctuatedAt;
+
+    private Punctuator punctuator = new Punctuator() {
+        @Override
+        public void punctuate(long timestamp) {
+            punctuatedAt = timestamp;
+        }
+    };
 
     private StreamsConfig createConfig(final boolean enableEoS) throws Exception {
         return new StreamsConfig(new Properties() {
@@ -133,14 +145,13 @@ public class StreamTaskTest {
         });
     }
 
-
-
-
     @Before
     public void setup() throws Exception {
         consumer.assign(Arrays.asList(partition1, partition2));
-        source1.addChild(processor);
-        source2.addChild(processor);
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
         config = createConfig(false);
         eosConfig = createConfig(true);
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
@@ -282,7 +293,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testMaybePunctuate() throws Exception {
+    public void testMaybePunctuateStreamTime() throws Exception {
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -295,42 +306,42 @@ public class StreamTaskTest {
                 new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
         ));
 
-        assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(5, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
 
-        assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(4, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
-        assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
 
-        assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(2, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(1, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
 
-        assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertTrue(task.process());
         assertEquals(0, task.numBuffered());
@@ -338,9 +349,71 @@ public class StreamTaskTest {
         assertEquals(3, source2.numReceived);
 
         assertFalse(task.process());
-        assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuateStreamTime());
+
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 30L, 40L);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCancelPunctuateStreamTime() throws Exception {
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        assertTrue(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        assertTrue(task.process());
+
+        processorStreamTime.supplier.scheduleCancellable.cancel();
+
+        assertFalse(task.maybePunctuateStreamTime());
+
+        processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
+    }
+
+    @Test
+    public void shouldPunctuateSystemTimeWhenIntervalElapsed() throws Exception {
+        long now = time.milliseconds();
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10, now + 20, now + 30);
+    }
+
+    @Test
+    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() throws Exception {
+        long now = time.milliseconds();
+        assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
+        time.sleep(9);
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now);
+    }
 
-        processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
+    @Test
+    public void testCancelPunctuateSystemTime() throws Exception {
+        long now = time.milliseconds();
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.scheduleCancellable.cancel();
+        time.sleep(10);
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10);
     }
 
     @SuppressWarnings("unchecked")
@@ -388,10 +461,10 @@ public class StreamTaskTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings(value = {"unchecked", "deprecation"})
     @Test
-    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() throws Exception {
-        final ProcessorNode punctuator = new ProcessorNode("test", new AbstractProcessor() {
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() throws Exception {
+        final Processor processor = new AbstractProcessor() {
             @Override
             public void init(final ProcessorContext context) {
                 context.schedule(1);
@@ -404,11 +477,51 @@ public class StreamTaskTest {
             public void punctuate(final long timestamp) {
                 throw new KafkaException("KABOOM!");
             }
-        }, Collections.<String>emptySet());
+        };
+
+        final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet());
+        punctuator.init(new NoOpProcessorContext());
+
+        try {
+            task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() {
+                @Override
+                public void punctuate(long timestamp) {
+                    processor.punctuate(timestamp);
+                }
+            });
+            fail("Should've thrown StreamsException");
+        } catch (final StreamsException e) {
+            final String message = e.getMessage();
+            assertTrue("message=" + message + " should contain processor", message.contains("processor 'test'"));
+            assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() throws Exception {
+        final Processor processor = new AbstractProcessor() {
+            @Override
+            public void init(final ProcessorContext context) {
+            }
+
+            @Override
+            public void process(final Object key, final Object value) {}
+
+            @Override
+            public void punctuate(final long timestamp) {}
+        };
+
+        final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet());
         punctuator.init(new NoOpProcessorContext());
 
         try {
-            task.punctuate(punctuator, 1);
+            task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() {
+                @Override
+                public void punctuate(long timestamp) {
+                    throw new KafkaException("KABOOM!");
+                }
+            });
             fail("Should've thrown StreamsException");
         } catch (final StreamsException e) {
             final String message = e.getMessage();
@@ -567,9 +680,9 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
-        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);
+        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
         try {
-            task.punctuate(processor, 10);
+            task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
             fail("Should throw illegal state exception as current node is not null");
         } catch (final IllegalStateException e) {
             // pass
@@ -578,27 +691,37 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception {
-        task.punctuate(processor, 5);
-        assertThat(processor.punctuatedAt, equalTo(5L));
-        task.punctuate(processor, 10);
-        assertThat(processor.punctuatedAt, equalTo(10L));
+        task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
+        assertThat(punctuatedAt, equalTo(5L));
+        task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
+        assertThat(punctuatedAt, equalTo(10L));
     }
 
     @Test
     public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws Exception {
-        task.punctuate(processor, 5);
+        task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception {
-        task.schedule(1);
+        task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                // no-op
+            }
+        });
     }
 
     @Test
-    public void shouldNotThrowIExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception {
-        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);
-        task.schedule(1);
+    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception {
+        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
+        task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                // no-op
+            }
+        });
     }
 
     @SuppressWarnings("unchecked")
@@ -612,7 +735,7 @@ public class StreamTaskTest {
         } catch (final RuntimeException e) {
             task = null;
         }
-        assertTrue(processor.closed);
+        assertTrue(processorStreamTime.closed);
         assertTrue(source1.closed);
         assertTrue(source2.closed);
     }
@@ -780,7 +903,7 @@ public class StreamTaskTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2);
+        final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processorStreamTime, source1, source2);
         final Map<String, SourceNode> sourceNodes = new HashMap() {
             {
                 put(topic1[0], processorNode);

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb56fa1..515d35d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -24,6 +24,9 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -167,6 +170,10 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol
         return storeMap.get(name);
     }
 
+    @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+        throw new UnsupportedOperationException("schedule() not supported.");
+    }
+
     @Override
     public void schedule(final long interval) {
         throw new UnsupportedOperationException("schedule() not supported.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 2fe44f0..38b0e7d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
@@ -33,7 +35,11 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     public boolean initialized;
 
     public MockProcessorNode(long scheduleInterval) {
-        this(new MockProcessorSupplier<K, V>(scheduleInterval));
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockProcessorNode(long scheduleInterval, PunctuationType punctuationType) {
+        this(new MockProcessorSupplier<K, V>(scheduleInterval, punctuationType));
     }
 
     private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
@@ -54,8 +60,8 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     @Override
-    public void punctuate(final long timestamp) {
-        super.punctuate(timestamp);
+    public void punctuate(final long timestamp, final Punctuator punctuator) {
+        super.punctuate(timestamp, punctuator);
         this.punctuatedAt = timestamp;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 571e084..c464aaa 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,9 +17,12 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 
 import java.util.ArrayList;
 
@@ -28,30 +31,57 @@ import static org.junit.Assert.assertEquals;
 public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
 
     public final ArrayList<String> processed = new ArrayList<>();
-    public final ArrayList<Long> punctuated = new ArrayList<>();
+    public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
+    public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
 
     private final long scheduleInterval;
+    private final PunctuationType punctuationType;
+    public Cancellable scheduleCancellable;
 
     public MockProcessorSupplier() {
         this(-1L);
     }
 
     public MockProcessorSupplier(long scheduleInterval) {
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockProcessorSupplier(long scheduleInterval, PunctuationType punctuationType) {
         this.scheduleInterval = scheduleInterval;
+        this.punctuationType = punctuationType;
     }
 
     @Override
     public Processor<K, V> get() {
-        return new MockProcessor();
+        return new MockProcessor(punctuationType);
     }
 
     public class MockProcessor extends AbstractProcessor<K, V> {
 
+        PunctuationType punctuationType;
+
+        public MockProcessor(PunctuationType punctuationType) {
+            this.punctuationType = punctuationType;
+        }
+
         @Override
         public void init(ProcessorContext context) {
             super.init(context);
-            if (scheduleInterval > 0L)
-                context.schedule(scheduleInterval);
+            if (scheduleInterval > 0L) {
+                scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() {
+                    @Override
+                    public void punctuate(long timestamp) {
+                        if (punctuationType == PunctuationType.STREAM_TIME) {
+                            assertEquals(timestamp, context().timestamp());
+                        }
+                        assertEquals(-1, context().partition());
+                        assertEquals(-1L, context().offset());
+
+                        (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
+                           .add(timestamp);
+                    }
+                });
+            }
         }
 
         @Override
@@ -60,15 +90,6 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
                     (value == null ? "null" : value));
 
         }
-
-        @Override
-        public void punctuate(long streamTime) {
-            assertEquals(streamTime, context().timestamp());
-            assertEquals(-1, context().partition());
-            assertEquals(-1L, context().offset());
-
-            punctuated.add(streamTime);
-        }
     }
 
     public void checkAndClearProcessResult(String... expected) {
@@ -86,7 +107,8 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
         processed.clear();
     }
 
-    public void checkAndClearPunctuateResult(long... expected) {
+    public void checkAndClearPunctuateResult(PunctuationType type, long... expected) {
+        ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
         assertEquals("the number of outputs:", expected.length, punctuated.size());
 
         for (int i = 0; i < expected.length; i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/efb060c5/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 8e399c5..1b9cfed 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -18,6 +18,9 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -48,9 +51,12 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
         return null;
     }
 
+    @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+        return null;
+    }
+
     @Override
     public void schedule(final long interval) {
-
     }
 
     @Override


Mime
View raw message