kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5144: renamed variables in MinTimestampTracker and added comments
Date Wed, 03 May 2017 20:43:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cfb238674 -> bc65c62e6


KAFKA-5144: renamed variables in MinTimestampTracker and added comments

The descendingSubsequence is a misnomer. The linked list is actually arranged so that the
lowest timestamp is first and larger timestamps are added to the end, therefore renamed to
ascendingSubsequence.
The minElem variable was also misnamed. It's actually the current maximum element as it's
taken from the end of the list.
Added comment to get() to make it clear it's returning the lowest timestamp.

Author: mihbor <mihbor@users.noreply.github.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2948 from mihbor/patch-4


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc65c62e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc65c62e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc65c62e

Branch: refs/heads/trunk
Commit: bc65c62e661c2829a30ad6b08fc9fa34303ad386
Parents: cfb2386
Author: Michal Borowiecki <mbor81@gmail.com>
Authored: Wed May 3 13:43:20 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 3 13:43:20 2017 -0700

----------------------------------------------------------------------
 .../internals/MinTimestampTracker.java          | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc65c62e/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index a67675c..17648e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -24,7 +24,8 @@ import java.util.LinkedList;
  */
 public class MinTimestampTracker<E> implements TimestampTracker<E> {
 
-    private final LinkedList<Stamped<E>> descendingSubsequence = new LinkedList<>();
+    // first element has the lowest timestamp and last element the highest
+    private final LinkedList<Stamped<E>> ascendingSubsequence = new LinkedList<>();
 
     // in the case that incoming traffic is very small, the records maybe put and polled
     // within a single iteration, in this case we need to remember the last polled
@@ -37,12 +38,12 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
     public void addElement(final Stamped<E> elem) {
         if (elem == null) throw new NullPointerException();
 
-        Stamped<E> minElem = descendingSubsequence.peekLast();
-        while (minElem != null && minElem.timestamp >= elem.timestamp) {
-            descendingSubsequence.removeLast();
-            minElem = descendingSubsequence.peekLast();
+        Stamped<E> maxElem = ascendingSubsequence.peekLast();
+        while (maxElem != null && maxElem.timestamp >= elem.timestamp) {
+            ascendingSubsequence.removeLast();
+            maxElem = ascendingSubsequence.peekLast();
         }
-        descendingSubsequence.offerLast(elem);
+        ascendingSubsequence.offerLast(elem); //lower timestamps have been retained and all
greater/equal removed
     }
 
     public void removeElement(final Stamped<E> elem) {
@@ -50,22 +51,25 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
             return;
         }
 
-        if (descendingSubsequence.peekFirst() == elem) {
-            descendingSubsequence.removeFirst();
+        if (ascendingSubsequence.peekFirst() == elem) {
+            ascendingSubsequence.removeFirst();
         }
 
-        if (descendingSubsequence.isEmpty()) {
+        if (ascendingSubsequence.isEmpty()) {
             lastKnownTime = elem.timestamp;
         }
 
     }
 
     public int size() {
-        return descendingSubsequence.size();
+        return ascendingSubsequence.size();
     }
 
+    /**
+     * @return the lowest tracked timestamp
+     */
     public long get() {
-        Stamped<E> stamped = descendingSubsequence.peekFirst();
+        Stamped<E> stamped = ascendingSubsequence.peekFirst();
 
         if (stamped == null)
             return lastKnownTime;
@@ -73,4 +77,4 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
             return stamped.timestamp;
     }
 
-}
\ No newline at end of file
+}


Mime
View raw message