kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Log4j Improvements on Fetcher (#8629)
Date Thu, 07 May 2020 19:06:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71397ad  MINOR: Log4j Improvements on Fetcher (#8629)
71397ad is described below

commit 71397adaff8fb4e9dd126c51cd38110cbd813936
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu May 7 12:03:43 2020 -0700

    MINOR: Log4j Improvements on Fetcher (#8629)
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java   | 6 ++++--
 .../java/org/apache/kafka/common/record/DefaultRecordBatch.java     | 2 ++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0699684..68c7347 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -676,13 +676,15 @@ public class Fetcher<K, V> implements Closeable {
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
+                log.trace("Returning {} fetched records at offset {} for assigned partition
{}",
+                        partRecords.size(), position, completedFetch.partition);
+
                 if (completedFetch.nextFetchOffset > position.offset) {
                     SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Returning fetched records at offset {} for assigned partition
{} and update " +
-                            "position to {}", position, completedFetch.partition, nextPosition);
+                    log.trace("Update fetching position to {} for partition {}", nextPosition,
completedFetch.partition);
                     subscriptions.position(completedFetch.partition, nextPosition);
                 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index d4a9587..b49f2fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -484,6 +484,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements
MutableRe
     @Override
     public String toString() {
         return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset()
+ "], " +
+                "sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
+                "isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch()
+ ", " +
                 "compression=" + compressionType() + ", timestampType=" + timestampType()
+ ", crc=" + checksum() + ")";
     }
 


Mime
View raw message