kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/4] kafka git commit: KAFKA-3716; Validate all timestamps are not negative
Date Sat, 18 Jun 2016 18:52:23 GMT
KAFKA-3716; Validate all timestamps are not negative

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska, Ismael Juma

Closes #1393 from guozhangwang/K3716-check-non-negative-timestamps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73949c28
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73949c28
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73949c28

Branch: refs/heads/0.10.0
Commit: 73949c288d43ed848550c1b5ba6cbbf297b110b8
Parents: c052002
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue May 17 11:25:49 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Jun 18 11:46:08 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/streams/kstream/Windows.java   | 6 +-----
 .../apache/kafka/streams/processor/internals/RecordQueue.java | 5 +++++
 .../apache/kafka/streams/processor/internals/StreamTask.java  | 7 ++++++-
 3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73949c28/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 06cacb4..c64a80f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,9 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The window specification interface that can be extended for windowing operation in joins
and aggregations.
@@ -32,8 +30,6 @@ public abstract class Windows<W extends Window> {
 
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one
day
 
-    private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
-
     protected String name;
 
     private long maintainDurationMs;
@@ -86,7 +82,7 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Creates all windows that contain the provided timestamp.
+     * Creates all windows that contain the provided timestamp, indexed by non-negative window
start timestamps.
      *
      * @param timestamp  the timestamp window should get created for
      * @return  a map of {@code windowStartTimestamp -> Window} entries

http://git-wip-us.apache.org/repos/asf/kafka/blob/73949c28/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 6911a45..7e5baf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.ArrayDeque;
@@ -84,6 +85,10 @@ public class RecordQueue {
                                                                          rawRecord.serializedValueSize(),
key, value);
             long timestamp = timestampExtractor.extract(record);
 
+            // validate that timestamp must be non-negative
+            if (timestamp < 0)
+                throw new StreamsException("Extracted timestamp value is negative, which
is not allowed.");
+
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);
 
             fifoQueue.addLast(stampedRecord);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73949c28/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d9efb6d..e7e24fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -209,7 +209,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
     public boolean maybePunctuate() {
         long timestamp = partitionGroup.timestamp();
 
-        return punctuationQueue.mayPunctuate(timestamp, this);
+        // if the timestamp is not known yet, meaning there is not enough data accumulated
+        // to reason stream partition time, then skip.
+        if (timestamp == TimestampTracker.NOT_KNOWN)
+            return false;
+        else
+            return punctuationQueue.mayPunctuate(timestamp, this);
     }
 
     /**


Mime
View raw message