kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.8 updated: KAFKA-12701: NPE in MetadataRequest when using topic IDs (#10885)
Date Tue, 15 Jun 2021 21:11:06 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new a6954a0  KAFKA-12701: NPE in MetadataRequest when using topic IDs (#10885)
a6954a0 is described below

commit a6954a0464ea586795eab0a8a585a45272d9a417
Author: Justine Olshan <jolshan@confluent.io>
AuthorDate: Tue Jun 15 14:09:28 2021 -0700

    KAFKA-12701: NPE in MetadataRequest when using topic IDs (#10885)
    
    We prevent handling MetadataRequests where the topic name is null (to prevent NPE) as
    well as prevent requests that set topic IDs since this functionality has not yet been
    implemented. When we do implement it  in https://github.com/apache/kafka/pull/9769,
    we should bump the request/response version.
    
    Added tests to ensure the error is thrown.
    
    (cherry picked from commit c16711cb8e0d1c03f)
    
    Reviewers: dengziming <swzmdeng@163.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/common/requests/MetadataRequest.java     | 21 +++++++++++--
 .../resources/common/message/MetadataRequest.json  |  3 +-
 .../kafka/common/requests/MetadataRequestTest.java | 25 ++++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 11 +++++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 35 ++++++++++++++++++++++
 5 files changed, 91 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 816f600..d38e9ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
@@ -92,6 +93,16 @@ public class MetadataRequest extends AbstractRequest {
             if (!data.allowAutoTopicCreation() && version < 4)
                 throw new UnsupportedVersionException("MetadataRequest versions older than
4 don't support the " +
                         "allowAutoTopicCreation field");
+            if (data.topics() != null) {
+                data.topics().forEach(topic -> {
+                    if (topic.name() == null)
+                        throw new UnsupportedVersionException("MetadataRequest version "
+ version +
+                                " does not support null topic names.");
+                    if (topic.topicId() != Uuid.ZERO_UUID)
+                        throw new UnsupportedVersionException("MetadataRequest version "
+ version +
+                            " does not support non-zero topic IDs.");
+                });
+            }
             return new MetadataRequest(data, version);
         }
 
@@ -117,13 +128,17 @@ public class MetadataRequest extends AbstractRequest {
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         MetadataResponseData responseData = new MetadataResponseData();
-        if (topics() != null) {
-            for (String topic : topics())
+        if (data.topics() != null) {
+            for (MetadataRequestTopic topic : data.topics()) {
+                // the response does not allow null, so convert to empty string if necessary
+                String topicName = topic.name() == null ? "" : topic.name();
                 responseData.topics().add(new MetadataResponseData.MetadataResponseTopic()
-                    .setName(topic)
+                    .setName(topicName)
+                    .setTopicId(topic.topicId())
                     .setErrorCode(error.code())
                     .setIsInternal(false)
                     .setPartitions(Collections.emptyList()));
+            }
         }
 
         responseData.setThrottleTimeMs(throttleTimeMs);
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index e5083a8..a1634b1 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -33,7 +33,8 @@
     //
     // Version 9 is the first flexible version.
     //
-    // Version 10 adds topicId.
+    // Version 10 adds topicId and allows name field to be null. However, this functionality
was not implemented on the server.
+    // Versions 10 and 11 should not use the topicId field or set topic name to null.
     //
     // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed
     // by the DescribeCluster API (KIP-700).
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
index e515232..74c217d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class MetadataRequestTest {
 
@@ -65,4 +70,24 @@ public class MetadataRequestTest {
         assertEquals(minVersion, builder3.oldestAllowedVersion());
         assertEquals(maxVersion, builder3.latestAllowedVersion());
     }
+
+    @Test
+    public void testTopicIdAndNullTopicNameRequests() {
+        // Construct invalid MetadataRequestTopics. We will build each one separately and
ensure the error is thrown.
+        List<MetadataRequestData.MetadataRequestTopic> topics = Arrays.asList(
+                new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
+                new MetadataRequestData.MetadataRequestTopic().setName(null),
+                new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
+                new MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(Uuid.randomUuid()));
+
+        // if version is 10 or 11, the invalid topic metadata should return an error
+        List<Short> invalidVersions = Arrays.asList((short) 10, (short) 11);
+        invalidVersions.forEach(version ->
+            topics.forEach(topic -> {
+                MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic));
+                MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData);
+                assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
+            })
+        );
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a9812fb..e3b4947 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1137,6 +1137,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Topic IDs are not supported for versions 10 and 11. Topic names can not be null in
these versions.
+    if (!metadataRequest.isAllTopics) {
+      metadataRequest.data.topics.forEach{ topic =>
+        if (topic.name == null) {
+          throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}")
+        } else if (topic.topicId != Uuid.ZERO_UUID) {
+          throw new InvalidRequestException(s"Topic IDs are not supported in requests for
version ${metadataRequest.version}")
+        }
+      }
+    }
+
     val topics = if (metadataRequest.isAllTopics)
       metadataCache.getAllTopics()
     else
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b89b29c..f059b42 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1067,6 +1067,41 @@ class KafkaApisTest {
   }
 
   @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {
+    // Construct invalid MetadataRequestTopics. We will try each one separately and ensure
the error is thrown.
+    val topics = List(new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
+      new MetadataRequestData.MetadataRequestTopic().setName(null),
+      new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
+      new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager,
+      autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator,
txnCoordinator)
+
+    // if version is 10 or 11, the invalid topic metadata should return an error
+    val invalidVersions = Set(10, 11)
+    invalidVersions.foreach( version =>
+      topics.foreach(topic => {
+        val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic))
+        val metadataRequest = new MetadataRequest(metadataRequestData, version.toShort)
+        val request = buildRequest(metadataRequest)
+        val kafkaApis = createKafkaApis()
+
+        val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+        EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+        EasyMock.replay(requestChannel)
+        kafkaApis.handle(request)
+
+        val response = readResponse(metadataRequest, capturedResponse).asInstanceOf[MetadataResponse]
+        assertEquals(1, response.topicMetadata.size)
+        assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST))
+        response.data.topics.forEach(topic => assertNotEquals(null, topic.name))
+        reset(requestChannel)
+      })
+    )
+  }
+
+  @Test
   def testOffsetCommitWithInvalidPartition(): Unit = {
     val topic = "topic"
     addTopicToMetadataCache(topic, numPartitions = 1)

Mime
View raw message