kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup
Date Wed, 08 Nov 2017 23:07:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 c9d1fc130 -> 1a0c00698


KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Damian Guy <damian.guy@gmail.com>

Closes #4186 from guozhangwang/K6179-cleanup-timestamp-tracker-on-clear

(cherry picked from commit ee1aaa091fc68587635604de5006b7acdb160361)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a0c0069
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a0c0069
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a0c0069

Branch: refs/heads/0.11.0
Commit: 1a0c006983698cac5403c9234732c4f4ba6df2c3
Parents: c9d1fc1
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Nov 8 15:07:14 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 8 15:07:35 2017 -0800

----------------------------------------------------------------------
 .../internals/MinTimestampTracker.java          |  4 ++++
 .../processor/internals/RecordQueue.java        | 13 +++++++++-
 .../processor/internals/TimestampTracker.java   |  6 ++++-
 .../processor/internals/RecordQueueTest.java    | 25 ++++++++++++++++++++
 4 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0c0069/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index 17648e3..df35c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -77,4 +77,8 @@ public class MinTimestampTracker<E> implements TimestampTracker<E>
{
             return stamped.timestamp;
     }
 
+    public void clear() {
+        lastKnownTime = NOT_KNOWN;
+        ascendingSubsequence.clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0c0069/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0902614..2b7a82e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -157,9 +157,20 @@ public class RecordQueue {
     }
 
     /**
-     * Clear the fifo queue of its elements
+     * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
      */
     public void clear() {
         fifoQueue.clear();
+        timeTracker.clear();
+        partitionTime = TimestampTracker.NOT_KNOWN;
+    }
+
+    /*
+     * Returns the timestamp tracker of the record queue
+     *
+     * This is only used for testing
+     */
+    TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
+        return timeTracker;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0c0069/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
index 9d56b96..30c816d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
@@ -24,7 +24,7 @@ package org.apache.kafka.streams.processor.internals;
  */
 public interface TimestampTracker<E> {
 
-    static final long NOT_KNOWN = -1L;
+    long NOT_KNOWN = -1L;
 
     /**
      * Adds a stamped elements to this tracker.
@@ -54,4 +54,8 @@ public interface TimestampTracker<E> {
      */
     int size();
 
+    /**
+     * Empty the tracker by removing any tracked stamped elements
+     */
+    void clear();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a0c0069/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index f80b7eb..9aaddcb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -55,6 +55,8 @@ public class RecordQueueTest {
     public void testTimeTracking() {
 
         assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -66,16 +68,19 @@ public class RecordQueueTest {
 
         assertEquals(3, queue.size());
         assertEquals(1L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
         assertEquals(1L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(1, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(1, queue.timeTracker().size());
 
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
@@ -88,22 +93,28 @@ public class RecordQueueTest {
 
         assertEquals(4, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the third record, now with 4, 1, 2
         assertEquals(3L, queue.poll().timestamp);
         assertEquals(3, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(3L, queue.timestamp());
+        assertEquals(1, queue.timeTracker().size());
 
         assertEquals(2L, queue.poll().timestamp);
+        assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(0, queue.timeTracker().size());
 
         // add three more records with 4, 5, 6
         List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -120,6 +131,20 @@ public class RecordQueueTest {
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(2, queue.size());
         assertEquals(5L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
+
+        // clear the queue
+        queue.clear();
+        assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.timeTracker().size());
+        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+
+        // re-insert the three records with 4, 5, 6
+        queue.addRawRecords(list3);
+
+        assertEquals(3, queue.size());
+        assertEquals(4L, queue.timestamp());
     }
 
     @Test(expected = StreamsException.class)


Mime
View raw message