Repository: kafka
Updated Branches:
refs/heads/trunk 7c2798986 -> c76b6e6d9
HOTFIX: special handling first ever triggered punctuate
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Anna Povzner <anna@confluent.io>
Closes #1208 from guozhangwang/KPunctuate
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c76b6e6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c76b6e6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c76b6e6d
Branch: refs/heads/trunk
Commit: c76b6e6d9bad2278076054f5175a2b053383388f
Parents: 7c27989
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sun Apr 10 23:09:43 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Apr 10 23:09:43 2016 -0700
----------------------------------------------------------------------
.../streams/processor/internals/PunctuationQueue.java | 2 +-
.../processor/internals/PunctuationSchedule.java | 13 +++++++++----
.../streams/processor/internals/StreamTaskTest.java | 10 +++++-----
3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@ public class PunctuationQueue {
PunctuationSchedule sched = top;
pq.poll();
punctuator.punctuate(sched.node(), timestamp);
- pq.add(sched.next());
+ pq.add(sched.next(timestamp));
punctuated = true;
top = pq.peek();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 758cfb0..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@ public class PunctuationSchedule extends Stamped<ProcessorNode>
{
final long interval;
public PunctuationSchedule(ProcessorNode node, long interval) {
- this(node, 0, interval);
+ this(node, 0L, interval);
}
public PunctuationSchedule(ProcessorNode node, long time, long interval) {
- super(node, time + interval);
+ super(node, time);
this.interval = interval;
}
@@ -34,8 +34,13 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
return value;
}
- public PunctuationSchedule next() {
- return new PunctuationSchedule(value, timestamp, interval);
+ public PunctuationSchedule next(long currTimestamp) {
+ // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+ // is equal to the interval) by reschedule based on the currTimestamp
+ if (timestamp == 0L)
+ return new PunctuationSchedule(value, currTimestamp + interval, interval);
+ else
+ return new PunctuationSchedule(value, timestamp + interval, interval);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index dd48947..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -226,15 +226,15 @@ public class StreamTaskTest {
StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(),
40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(),
15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertTrue(task.maybePunctuate());
@@ -275,7 +275,7 @@ public class StreamTaskTest {
assertFalse(task.maybePunctuate());
- processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
+ processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
task.close();
|