kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] 02/02: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
Date Wed, 06 May 2020 17:25:25 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a42674552ce031a4f1a7fcf6d9433cfa80b2a8b1
Author: Chris Egerton <chrise@confluent.io>
AuthorDate: Wed Apr 29 20:42:13 2020 -0700

    KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
    
    Simple logging additions at TRACE level that should help when the worker can't get caught
up to the end of an internal topic.
    
    Reviewers: Gwen Shapira <cshapi@gmail.com>, Aakash Shah <ashah@confluent.io>,
Konstantine Karantasis <konstantine@confluent.io>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e78276a..e301581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -281,9 +281,15 @@ public class KafkaBasedLog<K, V> {
             Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
+                TopicPartition topicPartition = entry.getKey();
+                long endOffset = entry.getValue();
+                long lastConsumedOffset = consumer.position(topicPartition);
+                if (lastConsumedOffset >= endOffset) {
+                    log.trace("Read to end offset {} for {}", endOffset, topicPartition);
                     it.remove();
-                else {
+                } else {
+                    log.trace("Behind end offset {} for {}; last-read offset is {}",
+                            endOffset, topicPartition, lastConsumedOffset);
                     poll(Integer.MAX_VALUE);
                     break;
                 }
@@ -345,4 +351,4 @@ public class KafkaBasedLog<K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}


Mime
View raw message