kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: special handling first ever triggered punctuate
Date Mon, 11 Apr 2016 06:09:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7c2798986 -> c76b6e6d9


HOTFIX: special handling first ever triggered punctuate

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Anna Povzner <anna@confluent.io>

Closes #1208 from guozhangwang/KPunctuate


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c76b6e6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c76b6e6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c76b6e6d

Branch: refs/heads/trunk
Commit: c76b6e6d9bad2278076054f5175a2b053383388f
Parents: 7c27989
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sun Apr 10 23:09:43 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Apr 10 23:09:43 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/PunctuationQueue.java  |  2 +-
 .../processor/internals/PunctuationSchedule.java       | 13 +++++++++----
 .../streams/processor/internals/StreamTaskTest.java    | 10 +++++-----
 3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@ public class PunctuationQueue {
                 PunctuationSchedule sched = top;
                 pq.poll();
                 punctuator.punctuate(sched.node(), timestamp);
-                pq.add(sched.next());
+                pq.add(sched.next(timestamp));
                 punctuated = true;
 
                 top = pq.peek();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 758cfb0..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@ public class PunctuationSchedule extends Stamped<ProcessorNode>
{
     final long interval;
 
     public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, 0, interval);
+        this(node, 0L, interval);
     }
 
     public PunctuationSchedule(ProcessorNode node, long time, long interval) {
-        super(node, time + interval);
+        super(node, time);
         this.interval = interval;
     }
 
@@ -34,8 +34,13 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
         return value;
     }
 
-    public PunctuationSchedule next() {
-        return new PunctuationSchedule(value, timestamp, interval);
+    public PunctuationSchedule next(long currTimestamp) {
+        // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+        // is equal to the interval) by reschedule based on the currTimestamp
+        if (timestamp == 0L)
+            return new PunctuationSchedule(value, currTimestamp + interval, interval);
+        else
+            return new PunctuationSchedule(value, timestamp + interval, interval);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index dd48947..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -226,15 +226,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                     new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             assertTrue(task.maybePunctuate());
@@ -275,7 +275,7 @@ public class StreamTaskTest {
 
             assertFalse(task.maybePunctuate());
 
-            processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
+            processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
 
             task.close();
 


Mime
View raw message