kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (#7086)
Date Tue, 16 Jul 2019 12:12:09 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 27eba24  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment
(#7086)
27eba24 is described below

commit 27eba2494b698ba48c6fbf3851ed36b5a287db22
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Jul 16 13:08:16 2019 +0100

    KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (#7086)
    
    Ensure that Producer#send() throws topic metadata exceptions only for the topic being
sent to and not for other topics in the producer's metadata instance. Also removes topics
from consumer's metadata when a topic is removed using manual assignment.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 87 +++++++++++++++-----
 .../consumer/internals/ConsumerNetworkClient.java  |  2 +-
 .../consumer/internals/SubscriptionState.java      |  8 +-
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../producer/internals/ProducerMetadata.java       |  3 +-
 .../org/apache/kafka/clients/MetadataTest.java     | 47 +++++++++--
 .../consumer/internals/ConsumerMetadataTest.java   |  5 ++
 .../internals/ConsumerNetworkClientTest.java       |  3 +-
 .../producer/internals/ProducerMetadataTest.java   | 10 +++
 .../DelegationTokenEndToEndAuthorizationTest.scala |  1 +
 .../kafka/api/EndToEndAuthorizationTest.scala      | 95 +++++++++++++++++++---
 .../kafka/api/SaslEndToEndAuthorizationTest.scala  |  1 +
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |  2 +
 13 files changed, 223 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f991fa6..81ad4a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -44,6 +45,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 /**
  * A class encapsulating some of the logic around metadata.
@@ -66,7 +68,8 @@ public class Metadata implements Closeable {
     private long lastRefreshMs;
     private long lastSuccessfulRefreshMs;
     private KafkaException fatalException;
-    private KafkaException recoverableException;
+    private Set<String> invalidTopics;
+    private Set<String> unauthorizedTopics;
     private MetadataCache cache = MetadataCache.empty();
     private boolean needUpdate;
     private final ClusterResourceListeners clusterResourceListeners;
@@ -97,6 +100,8 @@ public class Metadata implements Closeable {
         this.clusterResourceListeners = clusterResourceListeners;
         this.isClosed = false;
         this.lastSeenLeaderEpochs = new HashMap<>();
+        this.invalidTopics = Collections.emptySet();
+        this.unauthorizedTopics = Collections.emptySet();
     }
 
     /**
@@ -204,16 +209,6 @@ public class Metadata implements Closeable {
         }
     }
 
-    /**
-     * If any non-retriable exceptions were encountered during metadata update, clear and
return the exception.
-     */
-    public synchronized KafkaException getAndClearMetadataException() {
-        KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException);
-        fatalException = null;
-        recoverableException = null;
-        return metadataException;
-    }
-
     public synchronized void bootstrap(List<InetSocketAddress> addresses, long now)
{
         this.needUpdate = true;
         this.lastRefreshMs = now;
@@ -271,8 +266,7 @@ public class Metadata implements Closeable {
     }
 
     private void maybeSetMetadataError(Cluster cluster) {
-        // if we encounter any invalid topics, cache the exception to later throw to the
user
-        recoverableException = null;
+        clearRecoverableErrors();
         checkInvalidTopics(cluster);
         checkUnauthorizedTopics(cluster);
     }
@@ -280,16 +274,14 @@ public class Metadata implements Closeable {
     private void checkInvalidTopics(Cluster cluster) {
         if (!cluster.invalidTopics().isEmpty()) {
             log.error("Metadata response reported invalid topics {}", cluster.invalidTopics());
-            // We may be able to recover from this exception if metadata for this topic is
no longer needed
-            recoverableException = new InvalidTopicException(cluster.invalidTopics());
+            invalidTopics = new HashSet<>(cluster.invalidTopics());
         }
     }
 
     private void checkUnauthorizedTopics(Cluster cluster) {
         if (!cluster.unauthorizedTopics().isEmpty()) {
             log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics());
-            // We may be able to recover from this exception if metadata for this topic is
no longer needed
-            recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+            unauthorizedTopics = new HashSet<>(cluster.unauthorizedTopics());
         }
     }
 
@@ -360,12 +352,69 @@ public class Metadata implements Closeable {
         }
     }
 
-    public synchronized void maybeThrowException() {
-        KafkaException metadataException = getAndClearMetadataException();
+    /**
+     * If any non-retriable exceptions were encountered during metadata update, clear and
throw the exception.
+     * This is used by the consumer to propagate any fatal exceptions or topic exceptions
for any of the topics
+     * in the consumer's Metadata.
+     */
+    public synchronized void maybeThrowAnyException() {
+        clearErrorsAndMaybeThrowException(this::recoverableException);
+    }
+
+    /**
+     * If any fatal exceptions were encountered during metadata update, throw the exception.
This is used by
+     * the producer to abort waiting for metadata if there were fatal exceptions (e.g. authentication
failures)
+     * in the last metadata update.
+     */
+    public synchronized void maybeThrowFatalException() {
+        KafkaException metadataException = this.fatalException;
+        if (metadataException != null) {
+            fatalException = null;
+            throw metadataException;
+        }
+    }
+
+    /**
+     * If any non-retriable exceptions were encountered during metadata update, throw exception
if the exception
+     * is fatal or related to the specified topic. All exceptions from the last metadata
update are cleared.
+     * This is used by the producer to propagate topic metadata errors for send requests.
+     */
+    public synchronized void maybeThrowExceptionForTopic(String topic) {
+        clearErrorsAndMaybeThrowException(() -> recoverableExceptionForTopic(topic));
+    }
+
+    private void clearErrorsAndMaybeThrowException(Supplier<KafkaException> recoverableExceptionSupplier)
{
+        KafkaException metadataException = Optional.ofNullable(fatalException).orElseGet(recoverableExceptionSupplier);
+        fatalException = null;
+        clearRecoverableErrors();
         if (metadataException != null)
             throw metadataException;
     }
 
+    // We may be able to recover from this exception if metadata for this topic is no longer
needed
+    private KafkaException recoverableException() {
+        if (!unauthorizedTopics.isEmpty())
+            return new TopicAuthorizationException(unauthorizedTopics);
+        else if (!invalidTopics.isEmpty())
+            return new InvalidTopicException(invalidTopics);
+        else
+            return null;
+    }
+
+    private KafkaException recoverableExceptionForTopic(String topic) {
+        if (unauthorizedTopics.contains(topic))
+            return new TopicAuthorizationException(Collections.singleton(topic));
+        else if (invalidTopics.contains(topic))
+            return new InvalidTopicException(Collections.singleton(topic));
+        else
+            return null;
+    }
+
+    private void clearRecoverableErrors() {
+        invalidTopics = Collections.emptySet();
+        unauthorizedTopics = Collections.emptySet();
+    }
+
     /**
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 2a6e8b5..5a6f860 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -293,7 +293,7 @@ public class ConsumerNetworkClient implements Closeable {
         // called without the lock to avoid deadlock potential if handlers need to acquire
locks
         firePendingCompletedRequests();
 
-        metadata.maybeThrowException();
+        metadata.maybeThrowAnyException();
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 281578b..4504674 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -156,8 +156,12 @@ public class SubscriptionState {
             return false;
 
         subscription = topicsToSubscribe;
-        groupSubscription = new HashSet<>(groupSubscription);
-        groupSubscription.addAll(topicsToSubscribe);
+        if (subscriptionType != SubscriptionType.USER_ASSIGNED) {
+            groupSubscription = new HashSet<>(groupSubscription);
+            groupSubscription.addAll(topicsToSubscribe);
+        } else {
+            groupSubscription = new HashSet<>(topicsToSubscribe);
+        }
         return true;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 445bed1..5bda546 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1012,7 +1012,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                         String.format("Partition %d of topic %s with partition count %d is
not present in metadata after %d ms.",
                                 partition, topic, partitionsCount, maxWaitMs));
             }
-            metadata.maybeThrowException();
+            metadata.maybeThrowExceptionForTopic(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
         } while (partitionsCount == null || (partition != null && partition >=
partitionsCount));
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
index 295036b..ef53af4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
@@ -95,7 +95,8 @@ public class ProducerMetadata extends Metadata {
         long currentTimeMs = time.milliseconds();
         long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs
+ timeoutMs;
         time.waitObject(this, () -> {
-            maybeThrowException();
+            // Throw fatal exceptions, if there are any. Recoverable topic errors will be
handled by the caller.
+            maybeThrowFatalException();
             return updateVersion() > lastVersion || isClosed();
         }, deadlineMs);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index a0d0819..bb34094 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -403,18 +403,18 @@ public class MetadataTest {
                 Collections.singletonMap(invalidTopic, Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
         metadata.update(invalidTopicResponse, time.milliseconds());
 
-        InvalidTopicException e = assertThrows(InvalidTopicException.class, () -> metadata.maybeThrowException());
+        InvalidTopicException e = assertThrows(InvalidTopicException.class, () -> metadata.maybeThrowAnyException());
 
         assertEquals(Collections.singleton(invalidTopic), e.invalidTopics());
         // We clear the exception once it has been raised to the user
-        assertNull(metadata.getAndClearMetadataException());
+        metadata.maybeThrowAnyException();
 
         // Reset the invalid topic error
         metadata.update(invalidTopicResponse, time.milliseconds());
 
         // If we get a good update, the error should clear even if we haven't had a chance
to raise it to the user
         metadata.update(emptyMetadataResponse(), time.milliseconds());
-        assertNull(metadata.getAndClearMetadataException());
+        metadata.maybeThrowAnyException();
     }
 
     @Test
@@ -426,17 +426,52 @@ public class MetadataTest {
                 Collections.singletonMap(invalidTopic, Errors.TOPIC_AUTHORIZATION_FAILED),
Collections.emptyMap());
         metadata.update(unauthorizedTopicResponse, time.milliseconds());
 
-        TopicAuthorizationException e = assertThrows(TopicAuthorizationException.class, ()
-> metadata.maybeThrowException());
+        TopicAuthorizationException e = assertThrows(TopicAuthorizationException.class, ()
-> metadata.maybeThrowAnyException());
         assertEquals(Collections.singleton(invalidTopic), e.unauthorizedTopics());
         // We clear the exception once it has been raised to the user
-        assertNull(metadata.getAndClearMetadataException());
+        metadata.maybeThrowAnyException();
 
         // Reset the unauthorized topic error
         metadata.update(unauthorizedTopicResponse, time.milliseconds());
 
         // If we get a good update, the error should clear even if we haven't had a chance
to raise it to the user
         metadata.update(emptyMetadataResponse(), time.milliseconds());
-        assertNull(metadata.getAndClearMetadataException());
+        metadata.maybeThrowAnyException();
+    }
+
+    @Test
+    public void testMetadataTopicErrors() {
+        Time time = new MockTime();
+
+        Map<String, Errors> topicErrors = new HashMap<>(3);
+        topicErrors.put("invalidTopic", Errors.INVALID_TOPIC_EXCEPTION);
+        topicErrors.put("sensitiveTopic1", Errors.TOPIC_AUTHORIZATION_FAILED);
+        topicErrors.put("sensitiveTopic2", Errors.TOPIC_AUTHORIZATION_FAILED);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("clusterId", 1,
topicErrors, Collections.emptyMap());
+
+        metadata.update(metadataResponse, time.milliseconds());
+        TopicAuthorizationException e1 = assertThrows(TopicAuthorizationException.class,
+            () -> metadata.maybeThrowExceptionForTopic("sensitiveTopic1"));
+        assertEquals(Collections.singleton("sensitiveTopic1"), e1.unauthorizedTopics());
+        // We clear the exception once it has been raised to the user
+        metadata.maybeThrowAnyException();
+
+        metadata.update(metadataResponse, time.milliseconds());
+        TopicAuthorizationException e2 = assertThrows(TopicAuthorizationException.class,
+            () -> metadata.maybeThrowExceptionForTopic("sensitiveTopic2"));
+        assertEquals(Collections.singleton("sensitiveTopic2"), e2.unauthorizedTopics());
+        metadata.maybeThrowAnyException();
+
+        metadata.update(metadataResponse, time.milliseconds());
+        InvalidTopicException e3 = assertThrows(InvalidTopicException.class,
+            () -> metadata.maybeThrowExceptionForTopic("invalidTopic"));
+        assertEquals(Collections.singleton("invalidTopic"), e3.invalidTopics());
+        metadata.maybeThrowAnyException();
+
+        // Other topics should not throw exception, but they should clear existing exception
+        metadata.update(metadataResponse, time.milliseconds());
+        metadata.maybeThrowExceptionForTopic("anotherTopic");
+        metadata.maybeThrowAnyException();
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 86740f5..33d102d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -92,6 +92,11 @@ public class ConsumerMetadataTest {
                 new TopicPartition("bar", 0),
                 new TopicPartition("__consumer_offsets", 0)));
         testBasicSubscription(Utils.mkSet("foo", "bar"), Utils.mkSet("__consumer_offsets"));
+
+        subscription.assignFromUser(Utils.mkSet(
+                new TopicPartition("baz", 0),
+                new TopicPartition("__consumer_offsets", 0)));
+        testBasicSubscription(Utils.mkSet("baz"), Utils.mkSet("__consumer_offsets"));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 9c555e3..90b0fc5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -242,7 +241,7 @@ public class ConsumerNetworkClientTest {
             fail("Expected authentication error thrown");
         } catch (AuthenticationException e) {
             // After the exception is raised, it should have been cleared
-            assertNull(metadata.getAndClearMetadataException());
+            metadata.maybeThrowAnyException();
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
index aaf3857..3e29d61 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
@@ -17,10 +17,12 @@
 package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -37,6 +39,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 public class ProducerMetadataTest {
 
@@ -174,6 +177,13 @@ public class ProducerMetadataTest {
         }
     }
 
+    @Test
+    public void testMetadataWaitAbortedOnFatalException() throws Exception {
+        Time time = new MockTime();
+        metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Fatal exception
from test"));
+        assertThrows(AuthenticationException.class, () -> metadata.awaitUpdate(0, 1000));
+    }
+
     private MetadataResponse responseWithCurrentTopics() {
         return responseWithTopics(metadata.topics());
     }
diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
index b72cf3c..c96b172 100644
--- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala
@@ -68,6 +68,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
     val clientLoginContext = JaasTestUtils.tokenClientLoginModule(token.tokenInfo().tokenId(),
token.hmacAsBase64String())
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+    adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
   }
 
   private def waitForScramCredentials(clientPrincipal: String): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 9be1f1c..57b8c39 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -19,7 +19,6 @@ package kafka.api
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
-
 import java.io.File
 import java.util.concurrent.ExecutionException
 
@@ -27,7 +26,7 @@ import kafka.admin.AclCommand
 import kafka.security.auth._
 import kafka.server._
 import kafka.utils._
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -36,7 +35,7 @@ import org.apache.kafka.common.resource.PatternType
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
-import org.scalatest.Assertions.fail
+import org.scalatest.Assertions.{assertThrows, fail, intercept}
 
 import scala.collection.JavaConverters._
 
@@ -289,7 +288,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
     }
   }
 
-  protected def setAclsAndProduce(tp: TopicPartition) {
+  private def setReadAndWriteAcls(tp: TopicPartition) {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(consumeAclArgs(tp.topic))
     servers.foreach { s =>
@@ -297,19 +296,72 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
         new Resource(Topic, tp.topic, PatternType.LITERAL))
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get,
groupResource)
     }
+  }
+
+  protected def setAclsAndProduce(tp: TopicPartition) {
+    setReadAndWriteAcls(tp)
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
   }
 
+  private def setConsumerGroupAcls() {
+    AclCommand.main(groupAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get,
groupResource)
+    }
+  }
+
   /**
-    * Tests that a producer fails to publish messages when the appropriate ACL
-    * isn't set.
+    * Tests that producer, consumer and adminClient fail to publish messages, consume
+    * messages and describe topics respectively when the describe ACL isn't set.
+    * Also verifies that subsequent publish, consume and describe to authorized topic succeeds.
     */
-  @Test(expected = classOf[TopicAuthorizationException])
-  def testNoProduceWithoutDescribeAcl(): Unit = {
+  @Test
+  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(): Unit = {
+    // Set consumer group acls since we are testing topic authorization
+    setConsumerGroupAcls()
+
+    // Verify produce/consume/describe throw TopicAuthorizationException
     val producer = createProducer()
+    assertThrows[TopicAuthorizationException] { sendRecords(producer, numRecords, tp) }
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    assertThrows[TopicAuthorizationException] { consumeRecords(consumer, numRecords, topic
= tp.topic) }
+    val adminClient = createAdminClient()
+    val e1 = intercept[ExecutionException] { adminClient.describeTopics(Set(topic).asJava).all().get()
}
+    assertTrue("Unexpected exception " + e1.getCause, e1.getCause.isInstanceOf[TopicAuthorizationException])
+
+    // Verify successful produce/consume/describe on another topic using the same producer,
consumer and adminClient
+    val topic2 = "topic2"
+    val tp2 = new TopicPartition(topic2, 0)
+    setReadAndWriteAcls(tp2)
+    sendRecords(producer, numRecords, tp2)
+    consumer.assign(List(tp2).asJava)
+    consumeRecords(consumer, numRecords, topic = topic2)
+    val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).values
+    assertEquals(1, describeResults.get(topic2).get().partitions().size())
+    val e2 = intercept[ExecutionException] { adminClient.describeTopics(Set(topic).asJava).all().get()
}
+    assertTrue("Unexpected exception " + e2.getCause, e2.getCause.isInstanceOf[TopicAuthorizationException])
+
+    // Verify that consumer manually assigning both authorized and unauthorized topic doesn't
consume from either
+    consumer.assign(List(tp, tp2).asJava)
+    sendRecords(producer, numRecords, tp2)
+    def verifyNoRecords(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
+      assertTrue("Consumed records: " + records, records.isEmpty)
+      !records.isEmpty
+    }
+    assertThrows[TopicAuthorizationException] {
+      TestUtils.pollRecordsUntilTrue(consumer, verifyNoRecords, "Consumer didn't fail with
authorization exception within timeout")
+    }
+
+    // Add ACLs and verify successful produce/consume/describe on first topic
+    setReadAndWriteAcls(tp)
+    consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset =
numRecords, topic2)
     sendRecords(producer, numRecords, tp)
-    confirmReauthenticationMetrics
+    consumeRecords(consumer, numRecords, topic = topic)
+    val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).values
+    assertEquals(1, describeResults2.get(topic).get().partitions().size())
+    assertEquals(1, describeResults2.get(topic2).get().partitions().size())
   }
 
   @Test
@@ -343,13 +395,22 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
     confirmReauthenticationMetrics
   }
 
-  @Test(expected = classOf[TopicAuthorizationException])
+  @Test
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
     // this should timeout since the consumer will not be able to fetch any metadata for
the topic
-    consumeRecords(consumer, timeout = 3000)
+    assertThrows[TopicAuthorizationException] { consumeRecords(consumer, timeout = 3000)
}
+
+    // Verify that no records are consumed even if one of the requested topics is authorized
+    setReadAndWriteAcls(tp)
+    consumer.subscribe(List(topic, "topic2").asJava)
+    assertThrows[TopicAuthorizationException] { consumeRecords(consumer, timeout = 3000)
}
+
+    // Verify that records are consumed if all topics are authorized
+    consumer.subscribe(List(topic).asJava)
+    consumeRecordsIgnoreOneAuthorizationException(consumer)
   }
 
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
@@ -468,5 +529,17 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness
with Sas
       assertEquals(offset.toLong, record.offset)
     }
   }
+
+  // Consume records, ignoring at most one TopicAuthorization exception from previously sent
request
+  private def consumeRecordsIgnoreOneAuthorizationException(consumer: Consumer[Array[Byte],
Array[Byte]],
+                                                            numRecords: Int = 1,
+                                                            startingOffset: Int = 0,
+                                                            topic: String = topic): Unit
= {
+    try {
+      consumeRecords(consumer, numRecords, startingOffset, topic)
+    } catch {
+      case _: TopicAuthorizationException => consumeRecords(consumer, numRecords, startingOffset,
topic)
+    }
+  }
 }
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index aa6cb60..f51ec6e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -43,6 +43,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest
{
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
+    adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
     super.setUp()
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index bbe0dd8..fbfd01c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -107,9 +107,11 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
   this.serverConfig.put(s"$mechanismPrefix${KafkaConfig.SaslServerCallbackHandlerClassProp}",
classOf[TestServerCallbackHandler].getName)
   this.producerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
   this.consumerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
+  this.adminClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
   private val plainLogin = s"org.apache.kafka.common.security.plain.PlainLoginModule username=$KafkaPlainUser
required;"
   this.producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
   this.consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
+  this.adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
 
   override protected def kafkaClientSaslMechanism = "PLAIN"
   override protected def kafkaServerSaslMechanisms = List("PLAIN")


Mime
View raw message