kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7549; Old ProduceRequest with zstd compression does not return error to client (#5925)
Date Mon, 10 Dec 2018 17:45:36 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a3dffb  KAFKA-7549; Old ProduceRequest with zstd compression does not return error
to client (#5925)
7a3dffb is described below

commit 7a3dffb0ca1f7ff2bd838f42ca2f12b7a2fc3b0e
Author: Lee Dongjin <dongjin@apache.org>
AuthorDate: Tue Dec 11 02:45:18 2018 +0900

    KAFKA-7549; Old ProduceRequest with zstd compression does not return error to client (#5925)
    
    Older versions of the Produce API should return an error if zstd is used. This validation
existed, but it was done during request parsing, which means that instead of returning an
error code, the broker disconnected. This patch fixes the issue by moving the validation outside
of the parsing logic. It also fixes several other record validations which had the same problem.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/common/requests/ProduceRequest.java      | 80 ++++++++++++++--------
 core/src/main/scala/kafka/server/KafkaApis.scala   | 13 +++-
 .../unit/kafka/server/ProduceRequestTest.scala     | 18 +++--
 3 files changed, 74 insertions(+), 37 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index f87090e..9f9de42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
 import org.apache.kafka.common.protocol.Errors;
@@ -172,6 +173,21 @@ public class ProduceRequest extends AbstractRequest {
 
         @Override
         public ProduceRequest build(short version) {
+            return build(version, true);
+        }
+
+        // Visible for testing only
+        public ProduceRequest buildUnsafe(short version) {
+            return build(version, false);
+        }
+
+        private ProduceRequest build(short version, boolean validate) {
+            if (validate) {
+                // Validate the given records first
+                for (MemoryRecords records : partitionRecords.values()) {
+                    ProduceRequest.validateRecords(version, records);
+                }
+            }
             return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId);
         }
 
@@ -210,8 +226,9 @@ public class ProduceRequest extends AbstractRequest {
         this.partitionRecords = partitionRecords;
         this.partitionSizes = createPartitionSizes(partitionRecords);
 
-        for (MemoryRecords records : partitionRecords.values())
-            validateRecords(version, records);
+        for (MemoryRecords records : partitionRecords.values()) {
+            setFlags(records);
+        }
     }
 
     private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition,
MemoryRecords> partitionRecords) {
@@ -231,7 +248,7 @@ public class ProduceRequest extends AbstractRequest {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.get(PARTITION_ID);
                 MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                validateRecords(version, records);
+                setFlags(records);
                 partitionRecords.put(new TopicPartition(topic, partition), records);
             }
         }
@@ -241,32 +258,11 @@ public class ProduceRequest extends AbstractRequest {
         transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
     }
 
-    private void validateRecords(short version, MemoryRecords records) {
-        if (version >= 3) {
-            Iterator<MutableRecordBatch> iterator = records.batches().iterator();
-            if (!iterator.hasNext())
-                throw new InvalidRecordException("Produce requests with version " + version
+ " must have at least " +
-                        "one record batch");
-
-            MutableRecordBatch entry = iterator.next();
-            if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
-                throw new InvalidRecordException("Produce requests with version " + version
+ " are only allowed to " +
-                        "contain record batches with magic version 2");
-            if (version < 7 && entry.compressionType() == CompressionType.ZSTD)
{
-                throw new InvalidRecordException("Produce requests with version " + version
+ " are note allowed to " +
-                    "use ZStandard compression");
-            }
-
-            if (iterator.hasNext())
-                throw new InvalidRecordException("Produce requests with version " + version
+ " are only allowed to " +
-                        "contain exactly one record batch");
-            idempotent = entry.hasProducerId();
-            transactional = entry.isTransactional();
-        }
-
-        // Note that we do not do similar validation for older versions to ensure compatibility
with
-        // clients which send the wrong magic version in the wrong version of the produce
request. The broker
-        // did not do this validation before, so we maintain that behavior here.
+    private void setFlags(MemoryRecords records) {
+        Iterator<MutableRecordBatch> iterator = records.batches().iterator();
+        MutableRecordBatch entry = iterator.next();
+        idempotent = entry.hasProducerId();
+        transactional = entry.isTransactional();
     }
 
     /**
@@ -394,6 +390,32 @@ public class ProduceRequest extends AbstractRequest {
         partitionRecords = null;
     }
 
+    public static void validateRecords(short version, MemoryRecords records) {
+        if (version >= 3) {
+            Iterator<MutableRecordBatch> iterator = records.batches().iterator();
+            if (!iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with version " + version
+ " must have at least " +
+                    "one record batch");
+
+            MutableRecordBatch entry = iterator.next();
+            if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
+                throw new InvalidRecordException("Produce requests with version " + version
+ " are only allowed to " +
+                    "contain record batches with magic version 2");
+            if (version < 7 && entry.compressionType() == CompressionType.ZSTD)
{
+                throw new UnsupportedCompressionTypeException("Produce requests with version
" + version + " are note allowed to " +
+                    "use ZStandard compression");
+            }
+
+            if (iterator.hasNext())
+                throw new InvalidRecordException("Produce requests with version " + version
+ " are only allowed to " +
+                    "contain exactly one record batch");
+        }
+
+        // Note that we do not do similar validation for older versions to ensure compatibility
with
+        // clients which send the wrong magic version in the wrong version of the produce
request. The broker
+        // did not do this validation before, so we maintain that behavior here.
+    }
+
     public static ProduceRequest parse(ByteBuffer buffer, short version) {
         return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, buffer), version);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 08d03c7..dfecfd3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker,
LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats,
Records}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
@@ -408,6 +408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
     val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
+    val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
     val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
     for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala)
{
@@ -416,12 +417,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
-        authorizedRequestInfo += (topicPartition -> memoryRecords)
+        try {
+          ProduceRequest.validateRecords(request.header.apiVersion(), memoryRecords)
+          authorizedRequestInfo += (topicPartition -> memoryRecords)
+        } catch {
+          case e: ApiException =>
+            invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))
+        }
     }
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
-      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
+      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
++ invalidRequestResponses
       var errorInResponse = false
 
       mergedResponseStatus.foreach { case (topicPartition, status) =>
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index b1f3af1..906de71 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -133,11 +133,19 @@ class ProduceRequestTest extends BaseRequestTest {
     // produce request with v7: works fine!
     val res1 = sendProduceRequest(leader,
       new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava, null).build())
-    val (tp, partitionResponse) = res1.responses.asScala.head
-    assertEquals(topicPartition, tp)
-    assertEquals(Errors.NONE, partitionResponse.error)
-    assertEquals(0, partitionResponse.baseOffset)
-    assertEquals(-1, partitionResponse.logAppendTime)
+    val (tp1, partitionResponse1) = res1.responses.asScala.head
+    assertEquals(topicPartition, tp1)
+    assertEquals(Errors.NONE, partitionResponse1.error)
+    assertEquals(0, partitionResponse1.baseOffset)
+    assertEquals(-1, partitionResponse1.logAppendTime)
+
+    // produce request with v3: returns Errors.UNSUPPORTED_COMPRESSION_TYPE.
+    val res2 = sendProduceRequest(leader,
+      new ProduceRequest.Builder(3, 3, -1, 3000, partitionRecords.asJava, null)
+        .buildUnsafe(3))
+    val (tp2, partitionResponse2) = res2.responses.asScala.head
+    assertEquals(topicPartition, tp2)
+    assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, partitionResponse2.error)
   }
 
   private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse
= {


Mime
View raw message