kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3318: clean up consumer logging and error messages
Date Thu, 10 Mar 2016 19:29:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6eb061fa8 -> e403b3c4b


KAFKA-3318: clean up consumer logging and error messages

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma

Closes #1036 from hachikuji/KAFKA-3318


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e403b3c4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e403b3c4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e403b3c4

Branch: refs/heads/trunk
Commit: e403b3c4bf8ca308fe180b093da20700f4db73c5
Parents: 6eb061f
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Mar 10 11:29:08 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 10 11:29:08 2016 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 120 +++++++++----------
 .../consumer/internals/ConsumerCoordinator.java |  81 +++++++------
 .../clients/consumer/internals/Fetcher.java     |   6 +-
 .../main/java/org/apache/kafka/common/Node.java |   2 +-
 .../apache/kafka/common/protocol/Errors.java    |  10 ++
 5 files changed, 115 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c6492bc..c79d8e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -174,7 +174,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     public void ensureCoordinatorKnown() {
         while (coordinatorUnknown()) {
-            RequestFuture<Void> future = sendGroupMetadataRequest();
+            RequestFuture<Void> future = sendGroupCoordinatorRequest();
             client.poll(future);
 
             if (future.failed()) {
@@ -216,7 +216,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 continue;
             }
 
-            RequestFuture<ByteBuffer> future = performGroupJoin();
+            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
             client.poll(future);
 
             if (future.succeeded()) {
@@ -299,12 +299,12 @@ public abstract class AbstractCoordinator implements Closeable {
      * elected leader by the coordinator.
      * @return A request future which wraps the assignment returned from the group leader
      */
-    private RequestFuture<ByteBuffer> performGroupJoin() {
+    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
         // send a join group request to the coordinator
-        log.debug("(Re-)joining group {}", groupId);
+        log.info("(Re-)joining group {}", groupId);
         JoinGroupRequest request = new JoinGroupRequest(
                 groupId,
                 this.sessionTimeoutMs,
@@ -312,8 +312,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 protocolType(),
                 metadata());
 
-        // create the request for the coordinator
-        log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request,
this.coordinator.id());
+        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
         return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                 .compose(new JoinGroupResponseHandler());
     }
@@ -328,10 +327,9 @@ public abstract class AbstractCoordinator implements Closeable {
 
         @Override
         public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer>
future) {
-            // process the response
-            short errorCode = joinResponse.errorCode();
-            if (errorCode == Errors.NONE.code()) {
-                log.debug("Joined group: {}", joinResponse.toStruct());
+            Errors error = Errors.forCode(joinResponse.errorCode());
+            if (error == Errors.NONE) {
+                log.debug("Received successful join group response for group {}: {}", groupId,
joinResponse.toStruct());
                 AbstractCoordinator.this.memberId = joinResponse.memberId();
                 AbstractCoordinator.this.generation = joinResponse.generationId();
                 AbstractCoordinator.this.rejoinNeeded = false;
@@ -342,37 +340,33 @@ public abstract class AbstractCoordinator implements Closeable {
                 } else {
                     onJoinFollower().chain(future);
                 }
-            } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
-                log.debug("Attempt to join group {} rejected since coordinator is loading
the group.", groupId);
+            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                log.debug("Attempt to join group {} rejected since coordinator {} is loading
the group.", groupId,
+                        coordinator);
                 // backoff and retry
-                future.raise(Errors.forCode(errorCode));
-            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
+                future.raise(error);
+            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 // reset the member id and retry immediately
                 AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
-                log.info("Attempt to join group {} failed due to unknown member id, resetting
and retrying.",
-                        groupId);
+                log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
-            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
-                    || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
-                log.info("Attempt to join group {} failed due to obsolete coordinator information,
retrying.",
-                    groupId);
-                future.raise(Errors.forCode(errorCode));
-            } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
-                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()
-                    || errorCode == Errors.INVALID_GROUP_ID.code()) {
+                log.debug("Attempt to join group {} failed due to obsolete coordinator information:
{}", groupId, error.message());
+                future.raise(error);
+            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
+                    || error == Errors.INVALID_SESSION_TIMEOUT
+                    || error == Errors.INVALID_GROUP_ID) {
                 // log the error and re-throw the exception
-                Errors error = Errors.forCode(errorCode);
-                log.error("Attempt to join group {} failed due to: {}",
-                        groupId, error.exception().getMessage());
+                log.error("Attempt to join group {} failed due to fatal error: {}", groupId,
error.message());
                 future.raise(error);
-            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
                 // unexpected error, throw the exception
-                future.raise(new KafkaException("Unexpected error in join group response:
"
-                        + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
+                future.raise(new KafkaException("Unexpected error in join group response:
" + error.message()));
             }
         }
     }
@@ -381,7 +375,7 @@ public abstract class AbstractCoordinator implements Closeable {
         // send follower's sync group with an empty assignment
         SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
                 memberId, Collections.<String, ByteBuffer>emptyMap());
-        log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP,
request, this.coordinator.id());
+        log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId,
this.coordinator, request);
         return sendSyncGroupRequest(request);
     }
 
@@ -392,7 +386,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     joinResponse.members());
 
             SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId,
groupAssignment);
-            log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP,
request, this.coordinator.id());
+            log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId,
this.coordinator, request);
             return sendSyncGroupRequest(request);
         } catch (RuntimeException e) {
             return RequestFuture.failure(e);
@@ -418,7 +412,7 @@ public abstract class AbstractCoordinator implements Closeable {
                            RequestFuture<ByteBuffer> future) {
             Errors error = Errors.forCode(syncResponse.errorCode());
             if (error == Errors.NONE) {
-                log.debug("Received successful sync group response for group {}: {}", groupId,
syncResponse.toStruct());
+                log.info("Successfully joined group {} with generation {}", groupId, generation);
                 sensors.syncLatency.record(response.requestLatencyMs());
                 future.complete(syncResponse.memberAssignment());
             } else {
@@ -426,20 +420,20 @@ public abstract class AbstractCoordinator implements Closeable {
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.info("SyncGroup for group {} failed due to coordinator rebalance,
rejoining the group", groupId);
+                    log.debug("SyncGroup for group {} failed due to coordinator rebalance",
groupId);
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
-                    log.info("SyncGroup for group {} failed due to {}, rejoining the group",
groupId, error);
+                    log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                     future.raise(error);
                 } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
-                    log.info("SyncGroup for group {} failed due to {}, will find new coordinator
and rejoin", groupId, error);
+                    log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     coordinatorDead();
                     future.raise(error);
                 } else {
-                    future.raise(new KafkaException("Unexpected error from SyncGroup: " +
error.exception().getMessage()));
+                    future.raise(new KafkaException("Unexpected error from SyncGroup: " +
error.message()));
                 }
             }
         }
@@ -450,7 +444,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * one of the brokers. The returned future should be polled to get the result of the
request.
      * @return A request future which indicates the completion of the metadata request
      */
-    private RequestFuture<Void> sendGroupMetadataRequest() {
+    private RequestFuture<Void> sendGroupCoordinatorRequest() {
         // initiate the group metadata request
         // find a node to ask about the coordinator
         Node node = this.client.leastLoadedNode();
@@ -460,7 +454,7 @@ public abstract class AbstractCoordinator implements Closeable {
             return RequestFuture.noBrokersAvailable();
         } else {
             // create a group  metadata request
-            log.debug("Issuing group metadata request to broker {}", node.id());
+            log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
             GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
             return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                     .compose(new RequestFutureAdapter<ClientResponse, Void>() {
@@ -473,7 +467,7 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void>
future) {
-        log.debug("Group metadata response {}", resp);
+        log.debug("Received group coordinator response {}", resp);
 
         if (!coordinatorUnknown()) {
             // We already found the coordinator, so ignore the request
@@ -483,22 +477,24 @@ public abstract class AbstractCoordinator implements Closeable {
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            short errorCode = groupCoordinatorResponse.errorCode();
-            if (errorCode == Errors.NONE.code()) {
+            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
+            if (error == Errors.NONE) {
                 this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                         groupCoordinatorResponse.node().host(),
                         groupCoordinatorResponse.node().port());
 
+                log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
+
                 client.tryConnect(coordinator);
 
                 // start sending heartbeats only if we have a valid generation
                 if (generation > 0)
                     heartbeatTask.reset();
                 future.complete(null);
-            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
-                future.raise(Errors.forCode(errorCode));
+                future.raise(error);
             }
         }
     }
@@ -524,7 +520,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     protected void coordinatorDead() {
         if (this.coordinator != null) {
-            log.info("Marking the coordinator {} dead.", this.coordinator.id());
+            log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
             this.coordinator = null;
         }
     }
@@ -566,7 +562,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
             @Override
             public void onFailure(RuntimeException e) {
-                log.info("LeaveGroup request failed with error", e);
+                log.debug("LeaveGroup request for group {} failed with error", groupId, e);
             }
         });
 
@@ -608,33 +604,33 @@ public abstract class AbstractCoordinator implements Closeable {
         @Override
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void>
future) {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
-            short errorCode = heartbeatResponse.errorCode();
-            if (errorCode == Errors.NONE.code()) {
-                log.debug("Received successful heartbeat response.");
+            Errors error = Errors.forCode(heartbeatResponse.errorCode());
+            if (error == Errors.NONE) {
+                log.debug("Received successful heartbeat response for group {}", groupId);
                 future.complete(null);
-            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
-                    || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
-                log.info("Attempt to heart beat failed since coordinator is either not started
or not valid, marking it as dead.");
+            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                log.debug("Attempt to heart beat failed for group {} since coordinator {}
is either not started or not valid.",
+                        groupId, coordinator);
                 coordinatorDead();
-                future.raise(Errors.forCode(errorCode));
-            } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
-                log.info("Attempt to heart beat failed since the group is rebalancing, try
to re-join group.");
+                future.raise(error);
+            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
+                log.debug("Attempt to heart beat failed for group {} since it is rebalancing.",
groupId);
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
-            } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                log.info("Attempt to heart beat failed since generation id is not legal,
try to re-join group.");
+            } else if (error == Errors.ILLEGAL_GENERATION) {
+                log.debug("Attempt to heart beat failed for group {} since generation id
is not legal.", groupId);
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.ILLEGAL_GENERATION);
-            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
-                log.info("Attempt to heart beat failed since member id is not valid, reset
it and try to re-join group.");
+            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
+                log.debug("Attempt to heart beat failed for group {} since member id is not
valid.", groupId);
                 memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
-            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
-                future.raise(new KafkaException("Unexpected errorCode in heartbeat response:
"
-                        + Errors.forCode(errorCode).exception().getMessage()));
+                future.raise(new KafkaException("Unexpected error in heartbeat response:
" + error.message()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index aa39e11..b6b46c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -188,15 +188,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
-        log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
+        log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(),
groupId);
         try {
             Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsAssigned(assigned);
         } catch (WakeupException e) {
             throw e;
         } catch (Exception e) {
-            log.error("User provided listener " + listener.getClass().getName()
-                    + " failed on partition assignment: ", e);
+            log.error("User provided listener {} for group {} failed on partition assignment",
+                    listener.getClass().getName(), groupId, e);
         }
     }
 
@@ -222,11 +222,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         metadata.setTopics(this.subscriptions.groupSubscription());
         client.ensureFreshMetadata();
 
-        log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions);
+        log.debug("Performing assignment for group {} using strategy {} with subscriptions
{}",
+                groupId, assignor.name(), subscriptions);
 
         Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
 
-        log.debug("Finished assignment: {}", assignment);
+        log.debug("Finished assignment for group {}: {}", groupId, assignment);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet())
{
@@ -244,15 +245,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
-        log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
+        log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(),
groupId);
         try {
             Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
         } catch (WakeupException e) {
             throw e;
         } catch (Exception e) {
-            log.error("User provided listener " + listener.getClass().getName()
-                    + " failed on partition revocation: ", e);
+            log.error("User provided listener {} for group {} failed on partition revocation",
+                    listener.getClass().getName(), groupId, e);
         }
 
         subscriptions.needReassignment();
@@ -410,7 +411,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 return;
 
             if (coordinatorUnknown()) {
-                log.debug("Cannot auto-commit offsets now since the coordinator is unknown,
will retry after backoff");
+                log.debug("Cannot auto-commit offsets for group {} since the coordinator
is unknown", groupId);
                 client.schedule(this, now + retryBackoffMs);
                 return;
             }
@@ -423,10 +424,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     if (exception == null) {
                         reschedule(now + interval);
                     } else if (exception instanceof SendFailedException) {
-                        log.debug("Failed to send automatic offset commit, will retry immediately");
+                        log.debug("Failed to send automatic offset commit for group {}",
groupId);
                         reschedule(now);
                     } else {
-                        log.warn("Auto offset commit failed: {}", exception.getMessage());
+                        log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                         reschedule(now + interval);
                     }
                 }
@@ -447,7 +448,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not propagate the exception
-                log.warn("Auto offset commit failed: ", e.getMessage());
+                log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage());
             }
         }
     }
@@ -481,7 +482,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
-        log.trace("Sending offset-commit request with {} to {}", offsets, coordinator);
+        log.trace("Sending offset-commit request with {} to coordinator {} for group {}",
offsets, coordinator, groupId);
 
         return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                 .compose(new OffsetCommitResponseHandler(offsets));
@@ -520,12 +521,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
                 Errors error = Errors.forCode(entry.getValue());
                 if (error == Errors.NONE) {
-                    log.debug("Committed offset {} for partition {}", offset, tp);
+                    log.debug("Group {} committed offset {} for partition {}", groupId, offset,
tp);
                     if (subscriptions.isAssigned(tp))
                         // update the local cache only if the partition is still assigned
                         subscriptions.committed(tp, offsetAndMetadata);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    log.error("Unauthorized to commit for group {}", groupId);
+                    log.error("Not authorized to commit offsets for group {}", groupId);
                     future.raise(new GroupAuthorizationException(groupId));
                     return;
                 } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
@@ -533,18 +534,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                         || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                     // raise the error to the user
-                    log.info("Offset commit for group {} failed on partition {} due to {},
will retry", groupId, tp, error);
+                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId,
tp, error.message());
                     future.raise(error);
                     return;
                 } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                     // just retry
-                    log.info("Offset commit for group {} failed due to {}, will retry", groupId,
error);
+                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     future.raise(error);
                     return;
                 } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR_FOR_GROUP
                         || error == Errors.REQUEST_TIMED_OUT) {
-                    log.info("Offset commit for group {} failed due to {}, will find new
coordinator and retry", groupId, error);
+                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     coordinatorDead();
                     future.raise(error);
                     return;
@@ -552,19 +553,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         || error == Errors.ILLEGAL_GENERATION
                         || error == Errors.REBALANCE_IN_PROGRESS) {
                     // need to re-join group
-                    log.error("Error {} occurred while committing offsets for group {}",
error, groupId);
+                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     subscriptions.needReassignment();
-                    future.raise(new CommitFailedException("Commit cannot be completed due
to group rebalance"));
+                    future.raise(new CommitFailedException("Commit cannot be completed since
the group has already " +
+                            "rebalanced and assigned the partitions to another member. This
means that the time " +
+                            "between subsequent calls to poll() was longer than the configured
session.timeout.ms, " +
+                            "which typically implies that the poll loop is spending too much
time message processing. " +
+                            "You can address this either by increasing the session timeout
or by reducing the maximum " +
+                            "size of batches returned in poll() with max.poll.records."));
                     return;
                 } else {
-                    log.error("Error committing partition {} at offset {}: {}", tp, offset,
error.exception().getMessage());
-                    future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage()));
+                    log.error("Group {} failed to commit partition {} at offset {}: {}",
groupId, tp, offset, error.message());
+                    future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                     return;
                 }
             }
 
             if (!unauthorizedTopics.isEmpty()) {
-                log.error("Unauthorized to commit to topics {}", unauthorizedTopics);
+                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics,
groupId);
                 future.raise(new TopicAuthorizationException(unauthorizedTopics));
             } else {
                 future.complete(null);
@@ -583,9 +589,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
-        log.debug("Fetching committed offsets for partitions: {}",  partitions);
+        log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
         // construct the request
-        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
+        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
 
         // send the request with a callback
         return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
@@ -606,31 +612,30 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData data = entry.getValue();
                 if (data.hasError()) {
-                    log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
-                            .exception()
-                            .getMessage());
-                    if (data.errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
+                    Errors error = Errors.forCode(data.errorCode);
+                    log.debug("Group {} failed to fetch offset for partition {}: {}", groupId,
tp, error.message());
+
+                    if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                         // just retry
-                        future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
-                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code())
{
+                        future.raise(error);
+                    } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                         // re-discover the coordinator and retry
                         coordinatorDead();
-                        future.raise(Errors.NOT_COORDINATOR_FOR_GROUP);
-                    } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code()
-                            || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        future.raise(error);
+                    } else if (error == Errors.UNKNOWN_MEMBER_ID
+                            || error == Errors.ILLEGAL_GENERATION) {
                         // need to re-join group
                         subscriptions.needReassignment();
-                        future.raise(Errors.forCode(data.errorCode));
+                        future.raise(error);
                     } else {
-                        future.raise(new KafkaException("Unexpected error in fetch offset
response: "
-                                + Errors.forCode(data.errorCode).exception().getMessage()));
+                        future.raise(new KafkaException("Unexpected error in fetch offset
response: " + error.message()));
                     }
                     return;
                 } else if (data.offset >= 0) {
                     // record the position with the offset (-1 indicates no committed offset
to fetch)
                     offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                 } else {
-                    log.debug("No committed offset for partition " + tp);
+                    log.debug("Group {} has no committed offset for partition {}", groupId,
tp);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7a1a720..b4d5c02 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -501,12 +501,12 @@ public class Fetcher<K, V> {
             future.complete(offset);
         } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
                 || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-            log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership
information, retrying.",
+            log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership
information, retrying.",
                     topicPartition);
             future.raise(Errors.forCode(errorCode));
         } else {
-            log.error("Attempt to fetch offsets for partition {} failed due to: {}",
-                    topicPartition, Errors.forCode(errorCode).exception().getMessage());
+            log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
+                    topicPartition, Errors.forCode(errorCode).message());
             future.raise(new StaleMetadataException());
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 644cd71..24cf6f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -96,7 +96,7 @@ public class Node {
 
     @Override
     public String toString() {
-        return "Node(" + id + ", " + host + ", " + port + ")";
+        return host + ":" + port + " (id: " + idString + ")";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ab299af..90be014 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -183,6 +183,16 @@ public enum Errors {
     }
 
     /**
+     * Get a friendly description of the error (if one is available).
+     * @return the error message
+     */
+    public String message() {
+        if (exception != null)
+            return exception.getMessage();
+        return toString();
+    }
+
+    /**
      * Throw the exception if there is one
      */
     public static Errors forCode(short code) {


Mime
View raw message