kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8347: Choose next record to process by timestamp (#6719)
Date Fri, 17 May 2019 21:27:09 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 722e854  KAFKA-8347: Choose next record to process by timestamp (#6719)
722e854 is described below

commit 722e854bfea3586e9366981f7b4a794e048a2cbc
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu May 16 07:07:54 2019 -0700

    KAFKA-8347: Choose next record to process by timestamp (#6719)
    
    When choosing the next record to process, we should look at the head record's timestamp
of each partition and choose the lowest rather than choosing the lowest of the partition's
streamtime.
    
    This change effectively makes RecordQueue return the timestamp of the head record rather
than its streamtime. Streamtime is removed (replaced) from RecordQueue as it was only being
tracked in order to choose the next partition to poll from.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>,  Bill Bejeck <bbejeck@gmail.com>
---
 .../streams/processor/internals/RecordQueue.java   | 17 ++----
 .../processor/internals/PartitionGroupTest.java    | 70 ++++++++++++++++++----
 .../processor/internals/RecordQueueTest.java       |  8 +--
 3 files changed, 68 insertions(+), 27 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 572e629..7f3c08d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -46,7 +46,6 @@ public class RecordQueue {
     private final RecordDeserializer recordDeserializer;
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
-    private long partitionTime = UNKNOWN;
     private StampedRecord headRecord = null;
 
     RecordQueue(final TopicPartition partition,
@@ -98,7 +97,7 @@ public class RecordQueue {
             fifoQueue.addLast(rawRecord);
         }
 
-        maybeUpdateTimestamp();
+        updateHead();
 
         return size();
     }
@@ -112,7 +111,7 @@ public class RecordQueue {
         final StampedRecord recordToReturn = headRecord;
         headRecord = null;
 
-        maybeUpdateTimestamp();
+        updateHead();
 
         return recordToReturn;
     }
@@ -142,7 +141,7 @@ public class RecordQueue {
      * @return timestamp
      */
     public long timestamp() {
-        return partitionTime;
+        return headRecord == null ? UNKNOWN : headRecord.timestamp;
     }
 
     /**
@@ -151,10 +150,9 @@ public class RecordQueue {
     public void clear() {
         fifoQueue.clear();
         headRecord = null;
-        partitionTime = UNKNOWN;
     }
 
-    private void maybeUpdateTimestamp() {
+    private void updateHead() {
         while (headRecord == null && !fifoQueue.isEmpty()) {
             final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
             final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext,
raw);
@@ -166,7 +164,7 @@ public class RecordQueue {
 
             final long timestamp;
             try {
-                timestamp = timestampExtractor.extract(deserialized, partitionTime);
+                timestamp = timestampExtractor.extract(deserialized, timestamp());
             } catch (final StreamsException internalFatalExtractorException) {
                 throw internalFatalExtractorException;
             } catch (final Exception fatalUserException) {
@@ -187,11 +185,6 @@ public class RecordQueue {
             }
 
             headRecord = new StampedRecord(deserialized, timestamp);
-
-            // update the partition timestamp if the current head record's timestamp has
exceed its value
-            if (timestamp > partitionTime) {
-                partitionTime = timestamp;
-            }
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index c84bbc2..6b95bdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -117,7 +117,7 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[3, 5]
         // 2:[2, 4, 6]
-        // st: 2
+        // st: 1
         assertEquals(partition1, info.partition());
         verifyTimes(record, 1L, 1L);
         verifyBuffered(5, 2, 3);
@@ -127,7 +127,7 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[3, 5]
         // 2:[4, 6]
-        // st: 3
+        // st: 2
         assertEquals(partition2, info.partition());
         verifyTimes(record, 2L, 2L);
         verifyBuffered(4, 2, 2);
@@ -141,32 +141,32 @@ public class PartitionGroupTest {
         group.addRawRecords(partition1, list3);
         // 1:[3, 5, 2, 4]
         // 2:[4, 6]
-        // st: 3 (non-decreasing, so adding 2 doesn't change it)
+        // st: 2 (just adding records shouldn't change it)
         verifyBuffered(6, 4, 2);
         assertEquals(2L, group.timestamp());
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one record, time should not be advanced
+        // get one record, time should be advanced
         record = group.nextRecord(info);
         // 1:[5, 2, 4]
         // 2:[4, 6]
-        // st: 4 as partition st is now {5, 4}
+        // st: 3
         assertEquals(partition1, info.partition());
         verifyTimes(record, 3L, 3L);
         verifyBuffered(5, 3, 2);
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one record, time should not be advanced
+        // get one record, time should be advanced
         record = group.nextRecord(info);
         // 1:[5, 2, 4]
         // 2:[6]
-        // st: 5 as partition st is now {5, 6}
+        // st: 4
         assertEquals(partition2, info.partition());
         verifyTimes(record, 4L, 4L);
         verifyBuffered(4, 3, 1);
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one more record, now time should be advanced
+        // get one more record, time should be advanced
         record = group.nextRecord(info);
         // 1:[2, 4]
         // 2:[6]
@@ -190,17 +190,17 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[]
         // 2:[6]
-        // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known
time of 4)
+        // st: 5
         assertEquals(partition1, info.partition());
         verifyTimes(record, 4L, 5L);
         verifyBuffered(1, 0, 1);
         assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one more record, time should not be advanced
+        // get one more record, time should be advanced
         record = group.nextRecord(info);
         // 1:[]
         // 2:[]
-        // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of
4 and 6.)
+        // st: 6
         assertEquals(partition2, info.partition());
         verifyTimes(record, 6L, 6L);
         verifyBuffered(0, 0, 0);
@@ -208,6 +208,54 @@ public class PartitionGroupTest {
 
     }
 
+    @Test
+    public void shouldChooseNextRecordBasedOnHeadTimestamp() {
+        assertEquals(0, group.numBuffered());
+
+        // add three 3 records with timestamp 1, 5, 3 to partition-1
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue));
+
+        group.addRawRecords(partition1, list1);
+
+        verifyBuffered(3, 3, 0);
+        assertEquals(-1L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+
+        StampedRecord record;
+        final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+
+        // get first two records from partition 1
+        record = group.nextRecord(info);
+        assertEquals(record.timestamp, 1L);
+        record = group.nextRecord(info);
+        assertEquals(record.timestamp, 5L);
+
+        // add three 3 records with timestamp 2, 4, 6 to partition-2
+        final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+            new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
+
+        group.addRawRecords(partition2, list2);
+        // 1:[3]
+        // 2:[2, 4, 6]
+
+        // get one record, next record should be ts=2 from partition 2
+        record = group.nextRecord(info);
+        // 1:[3]
+        // 2:[4, 6]
+        assertEquals(record.timestamp, 2L);
+
+        // get one record, next up should have ts=3 from partition 1 (even though it has
seen a larger max timestamp =5)
+        record = group.nextRecord(info);
+        // 1:[]
+        // 2:[4, 6]
+        assertEquals(record.timestamp, 3L);
+    }
+
     private void verifyTimes(final StampedRecord record, final long recordTime, final long
streamTime) {
         assertEquals(recordTime, record.timestamp);
         assertEquals(streamTime, group.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index b91aba5..c16cb2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -117,7 +117,7 @@ public class RecordQueueTest {
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
-        assertEquals(2L, queue.timestamp());
+        assertEquals(1L, queue.timestamp());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
@@ -143,15 +143,15 @@ public class RecordQueueTest {
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
-        assertEquals(4L, queue.timestamp());
+        assertEquals(1L, queue.timestamp());
 
         assertEquals(1L, queue.poll().timestamp);
-        assertEquals(4L, queue.timestamp());
+        assertEquals(2L, queue.timestamp());
 
         assertEquals(2L, queue.poll().timestamp);
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(4L, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
 
         // add three more records with 4, 5, 6
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(


Mime
View raw message