kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix api exception single argument constructor usage (#6956)
Date Wed, 17 Jul 2019 16:43:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53b10a3  MINOR: Fix api exception single argument constructor usage (#6956)
53b10a3 is described below

commit 53b10a37a42769b0c1b91c3a66fbe7f60917cab5
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Jul 17 09:42:57 2019 -0700

    MINOR: Fix api exception single argument constructor usage (#6956)
    
    Api exception types usually have a single argument constructor which accepts the exception
message. However, some types actually use this constructor to initialize a field. This inconsistency
has led to some cases where exception messages were being incorrectly passed to these constructors
and interpreted incorrectly. For example, this leads to confusing messages like the following
in the log when we hit a GROUP_MAX_SIZE_REACHED error:
    ```
    Attempt to join group failed due to fatal error: Consumer group The consumer group has
reached its max size. already has the configured ...
    ```
    This patch fixes the problem by changing these constructors so that the exception message
is provided consistently. This affected `GroupAuthorizationException`, `TopicAuthorizationException`,
and `GroupMaxSizeReachedException`.
    
    Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Kamal Chandraprakash
<kamal.chandraprakash@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../consumer/internals/AbstractCoordinator.java        | 11 ++++++-----
 .../consumer/internals/ConsumerCoordinator.java        |  4 ++--
 .../kafka/clients/producer/internals/Sender.java       |  2 +-
 .../clients/producer/internals/TransactionManager.java |  6 +++---
 .../common/errors/GroupAuthorizationException.java     | 18 ++++++++++++++++--
 .../common/errors/GroupMaxSizeReachedException.java    |  4 ++--
 .../common/errors/TopicAuthorizationException.java     | 18 ++++++++++++++----
 .../java/org/apache/kafka/common/protocol/Errors.java  |  6 ++----
 docs/upgrade.html                                      |  5 +++++
 9 files changed, 51 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5a5444e..d5faa6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -541,9 +541,10 @@ public abstract class AbstractCoordinator implements Closeable {
                 // log the error and re-throw the exception
                 log.error("Attempt to join group failed due to fatal error: {}", error.message());
                 if (error == Errors.GROUP_MAX_SIZE_REACHED) {
-                    future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
+                    future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId
+
+                            " already has the configured maximum number of members."));
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                    future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else {
                     future.raise(error);
                 }
@@ -633,7 +634,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 requestRejoin();
 
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                    future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.debug("SyncGroup failed because the group began another rebalance");
                     future.raise(error);
@@ -699,7 +700,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 }
                 future.complete(null);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
             } else {
                 log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
                 future.raise(error);
@@ -923,7 +924,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 resetGeneration();
                 future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
             } else {
                 future.raise(new KafkaException("Unexpected error in heartbeat response:
" + error.message()));
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 48d0c95..4866986 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -995,7 +995,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         }
 
                         if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                            future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                            future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                             return;
                         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                             unauthorizedTopics.add(tp.topic());
@@ -1090,7 +1090,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     markCoordinatorUnknown();
                     future.raise(error);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
+                    future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
                 } else {
                     future.raise(new KafkaException("Unexpected error in fetch offset response:
" + error.message()));
                 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 121ddb2..919be28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -652,7 +652,7 @@ public class Sender implements Runnable {
             } else {
                 final RuntimeException exception;
                 if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
-                    exception = new TopicAuthorizationException(batch.topicPartition.topic());
+                    exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
                 else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                     exception = new ClusterAuthorizationException("The producer is not authorized
to do idempotent sends");
                 else
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ea4d9d0..03b0231 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1302,7 +1302,7 @@ public class TransactionManager {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
             } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED)
{
-                abortableError(new GroupAuthorizationException(builder.data().key()));
+                abortableError(GroupAuthorizationException.forGroupId(builder.data().key()));
             } else {
                 fatalError(new KafkaException(String.format("Could not find a coordinator
with type %s with key %s due to" +
                         "unexpected error: %s", coordinatorType, builder.data().key(),
@@ -1401,7 +1401,7 @@ public class TransactionManager {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
+                abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
             } else {
                 fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse:
" + error.message()));
             }
@@ -1463,7 +1463,7 @@ public class TransactionManager {
                     // If the topic is unknown or the coordinator is loading, retry with
the current coordinator
                     continue;
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
-                    abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
+                    abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
                     break;
                 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
                         || error == Errors.INVALID_PRODUCER_EPOCH
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
index c3f0795..22eae3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
@@ -19,13 +19,27 @@ package org.apache.kafka.common.errors;
 public class GroupAuthorizationException extends AuthorizationException {
     private final String groupId;
 
-    public GroupAuthorizationException(String groupId) {
-        super("Not authorized to access group: " + groupId);
+    public GroupAuthorizationException(String message, String groupId) {
+        super(message);
         this.groupId = groupId;
     }
 
+    public GroupAuthorizationException(String message) {
+        this(message, null);
+    }
+
+    /**
+     * Return the group ID that failed authorization. May be null if it is not known
+     * in the context the exception was raised in.
+     *
+     * @return nullable groupId
+     */
     public String groupId() {
         return groupId;
     }
 
+    public static GroupAuthorizationException forGroupId(String groupId) {
+        return new GroupAuthorizationException("Not authorized to access group: " + groupId,
groupId);
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
index 086b79f..85d0c7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java
@@ -22,7 +22,7 @@ package org.apache.kafka.common.errors;
 public class GroupMaxSizeReachedException extends ApiException {
     private static final long serialVersionUID = 1L;
 
-    public GroupMaxSizeReachedException(String groupId) {
-        super("Consumer group " + groupId + " already has the configured maximum number of
members.");
+    public GroupMaxSizeReachedException(String message) {
+        super(message);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
index 6bf260d..e2235f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
@@ -22,15 +22,25 @@ import java.util.Set;
 public class TopicAuthorizationException extends AuthorizationException {
     private final Set<String> unauthorizedTopics;
 
-    public TopicAuthorizationException(Set<String> unauthorizedTopics) {
-        super("Not authorized to access topics: " + unauthorizedTopics);
+    public TopicAuthorizationException(String message, Set<String> unauthorizedTopics)
{
+        super(message);
         this.unauthorizedTopics = unauthorizedTopics;
     }
 
-    public TopicAuthorizationException(String unauthorizedTopic) {
-        this(Collections.singleton(unauthorizedTopic));
+    public TopicAuthorizationException(Set<String> unauthorizedTopics) {
+        this("Not authorized to access topics: " + unauthorizedTopics, unauthorizedTopics);
+    }
+
+    public TopicAuthorizationException(String message) {
+        this(message, Collections.emptySet());
     }
 
+    /**
+     * Get the set of topics which failed authorization. May be empty if the set is not known
+     * in the context the exception was raised in.
+     *
+     * @return possibly empty set of unauthorized topics
+     */
     public Set<String> unauthorizedTopics() {
         return unauthorizedTopics;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 7e39f69..89bc051 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -185,10 +185,8 @@ public enum Errors {
             RebalanceInProgressException::new),
     INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid.",
             InvalidCommitOffsetSizeException::new),
-    TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
-            TopicAuthorizationException::new),
-    GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
-            GroupAuthorizationException::new),
+    TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.", TopicAuthorizationException::new),
+    GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.", GroupAuthorizationException::new),
     CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
             ClusterAuthorizationException::new),
     INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6bdd4a..9d0e738 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -24,6 +24,11 @@
     <li>The <code>bin/kafka-preferred-replica-election.sh</code> command
line tool has been deprecated. It has been replaced by <code>bin/kafka-leader-election.sh</code>.</li>
     <li>The methods <code>electPreferredLeaders</code> in the Java <code>AdminClient</code>
class have been deprecated in favor of the methods <code>electLeaders</code>.</li>
     <li>Scala code leveraging the <code>NewTopic(String, int, short)</code>
constructor with literal values will need to explicitly call <code>toShort</code>
on the second literal.</li>
+    <li>The argument in the constructor <code>GroupAuthorizationException(String)</code>
is now used to specify an exception message.
+        Previously it referred to the group that failed authorization. This was done for
consistency with other exception types and to
+        avoid potential misuse. The constructor <code>TopicAuthorizationException(String)</code>
which was previously used for a single
+        unauthorized topic was changed similarly.
+    </li>
 </ul>
 
 <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>


Mime
View raw message