kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Reset `streamTime` on clear (#8250)
Date Sat, 07 Mar 2020 16:23:19 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f081f3b  MINOR: Reset `streamTime` on clear (#8250)
f081f3b is described below

commit f081f3bd7619194e567daaa53b52216b75925adf
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Sat Mar 7 08:22:55 2020 -0800

    MINOR: Reset `streamTime` on clear (#8250)
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/processor/internals/PartitionGroup.java   | 7 +++----
 .../org/apache/kafka/streams/processor/internals/RecordQueue.java  | 5 ++---
 .../kafka/streams/processor/internals/PartitionGroupTest.java      | 2 +-
 3 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 32610b5..4514d8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -220,15 +220,14 @@ public class PartitionGroup {
 
     void close() {
         clear();
-
-        streamTime = RecordQueue.UNKNOWN;
     }
 
     void clear() {
-        nonEmptyQueuesByTime.clear();
-        totalBuffered = 0;
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.clear();
         }
+        nonEmptyQueuesByTime.clear();
+        totalBuffered = 0;
+        streamTime = RecordQueue.UNKNOWN;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0beade9..dff2f44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -47,7 +47,7 @@ public class RecordQueue {
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
     private StampedRecord headRecord = null;
-    private long partitionTime;
+    private long partitionTime = UNKNOWN;
 
     private final Sensor droppedRecordsSensor;
 
@@ -74,7 +74,6 @@ public class RecordQueue {
             droppedRecordsSensor
         );
         this.log = logContext.logger(RecordQueue.class);
-        setPartitionTime(UNKNOWN);
     }
 
     void setPartitionTime(final long partitionTime) {
@@ -167,7 +166,7 @@ public class RecordQueue {
     public void clear() {
         fifoQueue.clear();
         headRecord = null;
-        setPartitionTime(UNKNOWN);
+        partitionTime = UNKNOWN;
     }
 
     private void updateHead() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index d38c52d..40e6451 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -392,7 +392,7 @@ public class PartitionGroupTest {
         group.clear();
 
         assertThat(group.numBuffered(), equalTo(0));
-        assertThat(group.streamTime(), equalTo(3L));
+        assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN));
         assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null));
         assertThat(group.partitionTimestamp(partition1), equalTo(RecordQueue.UNKNOWN));
 


Mime
View raw message