This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c8501f MINOR: Record all poll invocations (#9234)
9c8501f is described below
commit 9c8501f5f0d4c1f8f4928ea98c889954f0e06cf8
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Sep 3 14:18:45 2020 -0500
MINOR: Record all poll invocations (#9234)
Record the pollSensor after every invocation to poll, rather than just when we get records
back so that we can accurately gauge how often we're invoking Consumer#poll.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@apache.org>,
Matthias J. Sax <mjsax@apache.org>
---
.../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2c39e1f..547458c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -636,8 +636,8 @@ public class StreamThread extends Thread {
final long pollLatency = advanceNowAndComputeLatency();
+ pollSensor.record(pollLatency, now);
if (records != null && !records.isEmpty()) {
- pollSensor.record(pollLatency, now);
pollRecordsSensor.record(records.count(), now);
taskManager.addRecordsToTasks(records);
}
|