kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: [MINOR] Improve consumer logging on LeaveGroup (#5420)
Date Sat, 28 Jul 2018 14:48:06 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08a4cda  [MINOR] Improve consumer logging on LeaveGroup (#5420)
08a4cda is described below

commit 08a4cda34e7535ead76d2a65826ff9e09b8bdd4f
Author: Dhruvil Shah <dhruvil@confluent.io>
AuthorDate: Sat Jul 28 07:48:01 2018 -0700

    [MINOR] Improve consumer logging on LeaveGroup (#5420)
    
    * Improve consumer logging on LeaveGroup
    
    * Add GroupCoordinator logging, and address review comments
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../clients/consumer/CommitFailedException.java    |  2 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../consumer/internals/AbstractCoordinator.java    |  8 ++++++-
 .../kafka/coordinator/group/GroupCoordinator.scala | 26 +++++++++++-----------
 4 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
index c6006b7..cae5b2a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -33,7 +33,7 @@ public class CommitFailedException extends KafkaException {
                 "rebalanced and assigned the partitions to another member. This means that
the time " +
                 "between subsequent calls to poll() was longer than the configured max.poll.interval.ms,
" +
                 "which typically implies that the poll loop is spending too much time message
processing. " +
-                "You can address this either by increasing the session timeout or by reducing
the maximum " +
+                "You can address this either by increasing max.poll.interval.ms or by reducing
the maximum " +
                 "size of batches returned in poll() with max.poll.records.");
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fb37fee..651bd79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1018,7 +1018,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     public void unsubscribe() {
         acquireAndEnsureOpen();
         try {
-            log.debug("Unsubscribed all topics or patterns and assigned partitions");
+            log.info("Unsubscribed all topics or patterns and assigned partitions");
             this.subscriptions.unsubscribe();
             this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 53834fb..f9e1c18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -803,7 +803,7 @@ public abstract class AbstractCoordinator implements Closeable {
         if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation
!= Generation.NO_GENERATION) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
-            log.debug("Sending LeaveGroup request to coordinator {}", coordinator);
+            log.info("Sending LeaveGroup request to coordinator {}", coordinator);
             LeaveGroupRequest.Builder request =
                     new LeaveGroupRequest.Builder(groupId, generation.memberId);
             client.send(coordinator, request)
@@ -1032,6 +1032,12 @@ public abstract class AbstractCoordinator implements Closeable {
                         } else if (heartbeat.pollTimeoutExpired(now)) {
                             // the poll timeout has expired, which means that the foreground
thread has stalled
                             // in between calls to poll(), so we explicitly leave the group.
+                            log.warn("This member will leave the group because consumer poll
timeout has expired. This " +
+                                    "means the time between subsequent calls to poll() was
longer than the configured " +
+                                    "max.poll.interval.ms, which typically implies that the
poll loop is spending too " +
+                                    "much time processing messages. You can address this
either by increasing " +
+                                    "max.poll.interval.ms or by reducing the maximum size
of batches returned in poll() " +
+                                    "with max.poll.records.");
                             maybeLeaveGroup();
                         } else if (!heartbeat.shouldHeartbeat(now)) {
                             // poll again after waiting for the retry backoff in case the
heartbeat failed or the
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 2cedacd..6ca443f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -291,7 +291,7 @@ class GroupCoordinator(val brokerId: Int,
                   if (group.is(CompletingRebalance) && generationId == group.generationId)
{
                     if (error != Errors.NONE) {
                       resetAndPropagateAssignmentError(group, error)
-                      maybePrepareRebalance(group)
+                      maybePrepareRebalance(group, s"error when storing group assignment
during SyncGroup (member: $memberId)")
                     } else {
                       setAndPropagateAssignment(group, assignment)
                       group.transitionTo(Stable)
@@ -333,7 +333,7 @@ class GroupCoordinator(val brokerId: Int,
             val member = group.get(memberId)
             removeHeartbeatForLeavingMember(group, member)
             debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing
it from the group")
-            removeMemberAndUpdateGroup(group, member)
+            removeMemberAndUpdateGroup(group, member, s"removing member $memberId on LeaveGroup")
             responseCallback(Errors.NONE)
           }
         }
@@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int,
                                     protocolType: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
-                                    callback: JoinCallback) = {
+                                    callback: JoinCallback): MemberMetadata = {
     val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
@@ -710,7 +710,7 @@ class GroupCoordinator(val brokerId: Int,
       group.newMemberAdded = true
 
     group.add(member)
-    maybePrepareRebalance(group)
+    maybePrepareRebalance(group, s"Adding new member $memberId")
     member
   }
 
@@ -720,17 +720,17 @@ class GroupCoordinator(val brokerId: Int,
                                        callback: JoinCallback) {
     member.supportedProtocols = protocols
     member.awaitingJoinCallback = callback
-    maybePrepareRebalance(group)
+    maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")
   }
 
-  private def maybePrepareRebalance(group: GroupMetadata) {
+  private def maybePrepareRebalance(group: GroupMetadata, reason: String) {
     group.inLock {
       if (group.canRebalance)
-        prepareRebalance(group)
+        prepareRebalance(group, reason)
     }
   }
 
-  private def prepareRebalance(group: GroupMetadata) {
+  private def prepareRebalance(group: GroupMetadata, reason: String) {
     // if any members are awaiting sync, cancel their request and have them rejoin
     if (group.is(CompletingRebalance))
       resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
@@ -747,18 +747,18 @@ class GroupCoordinator(val brokerId: Int,
 
     group.transitionTo(PreparingRebalance)
 
-    info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId}
" +
-      s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
+    info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with
old generation " +
+      s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})
(reason: $reason)")
 
     val groupKey = GroupKey(group.groupId)
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
-  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) {
+  private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason:
String) {
     group.remove(member.memberId)
     group.currentState match {
       case Dead | Empty =>
-      case Stable | CompletingRebalance => maybePrepareRebalance(group)
+      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
       case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
     }
   }
@@ -837,7 +837,7 @@ class GroupCoordinator(val brokerId: Int,
     group.inLock {
       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it
from the group")
-        removeMemberAndUpdateGroup(group, member)
+        removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on
heartbeat expiration")
       }
     }
   }


Mime
View raw message