kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.2 updated: MINOR: Avoid dividing by zero (#7143)
Date Sat, 03 Aug 2019 21:21:56 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f3f3258  MINOR: Avoid dividing by zero (#7143)
f3f3258 is described below

commit f3f32588110092d0c3cc096610a1f66eb65253a8
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Sat Aug 3 14:03:15 2019 -0700

    MINOR: Avoid dividing by zero (#7143)
    
    Reviews: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>,
Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, Guozhang
Wang <guozhang@confluent.io>
---
 .../apache/kafka/streams/processor/internals/StreamThread.java   | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7a88267..3083ec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1065,7 +1065,7 @@ public class StreamThread extends Thread {
      *                               or if the task producer got fenced (EOS)
      */
     boolean maybeCommit() {
-        int committed = 0;
+        final int committed;
 
         if (now - lastCommitMs > commitTimeMs) {
             if (log.isTraceEnabled()) {
@@ -1073,7 +1073,7 @@ public class StreamThread extends Thread {
                     taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs,
commitTimeMs);
             }
 
-            committed += taskManager.commitAll();
+            committed = taskManager.commitAll();
             if (committed > 0) {
                 final long intervalCommitLatency = advanceNowAndComputeLatency();
                 streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed,
now);
@@ -1090,11 +1090,10 @@ public class StreamThread extends Thread {
             lastCommitMs = now;
             processStandbyRecords = true;
         } else {
-            final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested();
-            if (commitPerRequested > 0) {
+            committed = taskManager.maybeCommitActiveTasksPerUserRequested();
+            if (committed > 0) {
                 final long requestCommitLatency = advanceNowAndComputeLatency();
                 streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed,
now);
-                committed += commitPerRequested;
             }
         }
 


Mime
View raw message