kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davidart...@apache.org
Subject [kafka] branch trunk updated: Improve logging in the consumer for epoch updates (#6879)
Date Tue, 04 Jun 2019 21:49:52 GMT
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 264d1d8  Improve logging in the consumer for epoch updates (#6879)
264d1d8 is described below

commit 264d1d8a8b7ede0fb8e3595d0153893966bb73b8
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Tue Jun 4 17:49:30 2019 -0400

    Improve logging in the consumer for epoch updates (#6879)
---
 clients/src/main/java/org/apache/kafka/clients/Metadata.java       | 2 +-
 clients/src/main/java/org/apache/kafka/clients/MetadataCache.java  | 5 ++++-
 .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java  | 7 ++++---
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f991fa6..94e4eb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -177,7 +177,7 @@ public class Metadata implements Closeable {
             }
             return true;
         } else {
-            log.debug("Not replacing existing epoch {} with new epoch {}", oldEpoch, epoch);
+            log.debug("Not replacing existing epoch {} with new epoch {} for partition {}",
oldEpoch, epoch, topicPartition);
             return false;
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index b928b8e..b5c9de6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -137,7 +137,10 @@ public class MetadataCache {
     @Override
     public String toString() {
         return "MetadataCache{" +
-                "cluster=" + cluster() +
+                "clusterId='" + clusterId + '\'' +
+                ", nodes=" + nodes +
+                ", partitions=" + metadataByPartition.values() +
+                ", controller=" + controller +
                 '}';
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e638963..cff6e30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -861,13 +861,14 @@ public class Fetcher<K, V> implements Closeable {
         final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionDataMap
= new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet())
{
             TopicPartition tp  = entry.getKey();
+            Long offset = entry.getValue();
             Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
             if (!currentInfo.isPresent()) {
-                log.debug("Leader for partition {} is unknown for fetching offset", tp);
+                log.debug("Leader for partition {} is unknown for fetching offset {}", tp,
offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
             } else if (currentInfo.get().partitionInfo().leader() == null) {
-                log.debug("Leader for partition {} is unavailable for fetching offset", tp);
+                log.debug("Leader for partition {} is unavailable for fetching offset {}",
tp, offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
             } else if (client.isUnavailable(currentInfo.get().partitionInfo().leader()))
{
@@ -881,7 +882,7 @@ public class Fetcher<K, V> implements Closeable {
                 partitionsToRetry.add(tp);
             } else {
                 partitionDataMap.put(tp,
-                        new ListOffsetRequest.PartitionData(entry.getValue(), Optional.of(currentInfo.get().epoch())));
+                        new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);


Mime
View raw message