kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: KIP-138 renaming of string names
Date Thu, 07 Sep 2017 01:39:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3f155eaa2 -> 9b85cf9ed


MINOR: KIP-138 renaming of string names

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Damian Guy <damian.guy@gmail.com>

Closes #3796 from guozhangwang/kip-138-minor-renames


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b85cf9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b85cf9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b85cf9e

Branch: refs/heads/trunk
Commit: 9b85cf9ed02d4b3b87ea39e0c2f8c35e1e813f2f
Parents: 3f155ea
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Sep 6 18:39:40 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 6 18:39:40 2017 -0700

----------------------------------------------------------------------
 docs/streams/developer-guide.html                            | 2 +-
 docs/streams/upgrade-guide.html                              | 4 ++--
 .../main/java/org/apache/kafka/streams/kstream/KStream.java  | 6 +++---
 .../org/apache/kafka/streams/processor/ProcessorContext.java | 4 ++--
 .../org/apache/kafka/streams/processor/PunctuationType.java  | 4 ++--
 .../apache/kafka/streams/processor/internals/StreamTask.java | 4 ++--
 .../kafka/streams/processor/internals/StreamTaskTest.java    | 8 ++++----
 7 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 8433bf3..99467a7 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -115,7 +115,7 @@
     this.context = context;
 
     // schedule a punctuation method every 1000 milliseconds.
-    this.context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator() {
+    this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
         @Override
         public void punctuate(long timestamp) {
             KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 9b04a59..a868c91 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -95,8 +95,8 @@
     <p>
         Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>)
and hence the <code>punctuate</code> function was data-driven only because stream
time is determined (and advanced forward) by the timestamps derived from the input data.
         If there is no data arriving at the processor, the stream time would not advance
and hence punctuation will not be triggered.
-        On the other hand, When wall-clock time (i.e. <code>PunctuationType.SYSTEM_TIME</code>)
is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
-        So for example if the <code>Punctuator</code> function is scheduled based
on <code>PunctuationType.SYSTEM_TIME</code>, if these 60 records were processed
within 20 seconds,
+        On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>)
is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
+        So for example if the <code>Punctuator</code> function is scheduled based
on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed
within 20 seconds,
         <code>punctuate</code> would be called 2 times (one time every 10 seconds);
         if these 60 records were processed within 5 seconds, then no <code>punctuate</code>
would be called at all.
         Users can schedule multiple <code>Punctuator</code> callbacks with different
<code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code>
multiple times inside processor's <code>init()</code> method.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 8301cba..b8b5b8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -830,7 +830,7 @@ public interface KStream<K, V> {
      *                 this.state = context.getStateStore("myTransformState");
      *                 // punctuate each 1000ms; can access this.state
      *                 // can emit as many new KeyValue pairs as required via this.context#forward()
-     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
      *             }
      *
      *             KeyValue transform(K key, V value) {
@@ -903,7 +903,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myValueTransformState");
-     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
      *             }
      *
      *             NewValueType transform(V value) {
@@ -969,7 +969,7 @@ public interface KStream<K, V> {
      *
      *             void init(ProcessorContext context) {
      *                 this.state = context.getStateStore("myProcessorState");
-     *                 context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
// punctuate each 1000ms, can access this.state
      *             }
      *
      *             void process(K key, V value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index eef7e20..cdf1612 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -96,14 +96,14 @@ public interface ProcessorContext {
      *   <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced
by the processing of messages
      *   in accordance with the timestamp as extracted by the {@link TimestampExtractor}
in use.
      *   <b>NOTE:</b> Only advanced if messages arrive</li>
-     *   <li>{@link PunctuationType#SYSTEM_TIME} - uses system time (the wall-clock
time),
+     *   <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock
time),
      *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
      *   independent of whether new messages arrive. <b>NOTE:</b> This is best
effort only as its granularity is limited
      *   by how long an iteration of the processing loop takes to complete</li>
      * </ul>
      *
      * @param interval the time interval between punctuations
-     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#SYSTEM_TIME}
+     * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
      * @param callback a function consuming timestamps representing the current stream or
system time
      * @return a handle allowing cancellation of the punctuation schedule established by
this method
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
index 4dd9300..bc0003d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java
@@ -22,7 +22,7 @@ package org.apache.kafka.streams.processor;
  *   <li>STREAM_TIME - uses "stream time", which is advanced by the processing of messages
  *   in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
  *   <b>NOTE:</b> Only advanced if messages arrive</li>
- *   <li>SYSTEM_TIME - uses system time (the wall-clock time),
+ *   <li>WALL_CLOCK_TIME - uses system time (the wall-clock time),
  *   which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
  *   independent of whether new messages arrive. <b>NOTE:</b> This is best effort
only as its granularity is limited
  *   by how long an iteration of the processing loop takes to complete</li>
@@ -30,5 +30,5 @@ package org.apache.kafka.streams.processor;
  */
 public enum PunctuationType {
    STREAM_TIME,
-   SYSTEM_TIME,
+   WALL_CLOCK_TIME,
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 1fe567e..288a597 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -524,7 +524,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         switch (type) {
             case STREAM_TIME:
                 return streamTimePunctuationQueue.schedule(schedule);
-            case SYSTEM_TIME:
+            case WALL_CLOCK_TIME:
                 return systemTimePunctuationQueue.schedule(schedule);
             default:
                 throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
@@ -563,7 +563,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     public boolean maybePunctuateSystemTime() {
         final long timestamp = time.milliseconds();
 
-        return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME,
this);
+        return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME,
this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b85cf9e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 61d7875..a9d43ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -96,7 +96,7 @@ public class StreamTaskTest {
     private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1,
intDeserializer, intDeserializer);
     private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2,
intDeserializer, intDeserializer);
     private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
-    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L,
PunctuationType.SYSTEM_TIME);
+    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L,
PunctuationType.WALL_CLOCK_TIME);
 
     private final ProcessorTopology topology = new ProcessorTopology(
             Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime),
@@ -405,7 +405,7 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateSystemTime());
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME,
now + 10, now + 20, now + 30);
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10, now + 20, now + 30);
     }
 
     @Test
@@ -414,7 +414,7 @@ public class StreamTaskTest {
         assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
         time.sleep(9);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME,
now);
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now);
     }
 
     @Test
@@ -425,7 +425,7 @@ public class StreamTaskTest {
         processorSystemTime.supplier.scheduleCancellable.cancel();
         time.sleep(10);
         assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME,
now + 10);
+        processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10);
     }
 
     @SuppressWarnings("unchecked")


Mime
View raw message