kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167)
Date Thu, 10 Oct 2019 21:45:35 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 18cfd13  KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167)
18cfd13 is described below

commit 18cfd13a9b211cbe272563544904d463c0ed93bb
Author: Tu V. Tran <tu@confluent.io>
AuthorDate: Thu Oct 10 14:44:37 2019 -0700

    KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167)
    
    All the changes are in ReplicaManager.appendToLocalLog and ReplicaManager.appendRecords. Also, replaced LogAppendInfo.unknownLogAppendInfoWithLogStartOffset with LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo to include those 2 new fields.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/common/requests/ProduceRequest.java      |  2 +-
 .../kafka/common/requests/ProduceResponse.java     | 99 ++++++++++++++--------
 .../resources/common/message/ProduceRequest.json   |  2 +-
 .../resources/common/message/ProduceResponse.json  | 12 +--
 .../apache/kafka/common/message/MessageTest.java   | 14 +--
 .../kafka/common/requests/RequestResponseTest.java |  3 +-
 .../kafka/common/RecordValidationException.scala   | 25 ++++++
 core/src/main/scala/kafka/log/Log.scala            | 18 +++-
 core/src/main/scala/kafka/log/LogValidator.scala   | 69 ++++++++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   | 37 +++++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 10 +--
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 58 +++++++++++--
 .../unit/kafka/server/ProduceRequestTest.scala     | 36 +++++++-
 13 files changed, 285 insertions(+), 100 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 932473c..7b3ae1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest {
     private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
 
     /**
-     * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
+     * V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
      * (See KIP-467)
      */
     private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index a6df880..8bfc629 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse {
     private static final String BASE_OFFSET_KEY_NAME = "base_offset";
     private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
     private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
-    private static final String ERROR_RECORDS_KEY_NAME = "error_records";
-    private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset";
-    private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message";
+    private static final String RECORD_ERRORS_KEY_NAME = "record_errors";
+    private static final String BATCH_INDEX_KEY_NAME = "batch_index";
+    private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message";
     private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
 
     private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME,
             "The start offset of the log at the time this produce response was created", INVALID_OFFSET);
-    private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME,
+    private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME,
             "The error message of the record that caused the batch to be dropped");
     private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
             "The global error message summarizing the common root cause of the records that caused the batch to be dropped");
@@ -160,7 +160,7 @@ public class ProduceResponse extends AbstractResponse {
     private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;
 
     /**
-     * V8 adds error_records and error_message. (see KIP-467)
+     * V8 adds record_errors and error_message. (see KIP-467)
      */
     public static final Schema PRODUCE_RESPONSE_V8 = new Schema(
             new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema(
@@ -174,11 +174,11 @@ public class ProduceResponse extends AbstractResponse {
                                     "If LogAppendTime is used for the topic, the timestamp will be the broker local " +
                                     "time when the messages are appended."),
                             LOG_START_OFFSET_FIELD,
-                            new Field(ERROR_RECORDS_KEY_NAME, new ArrayOf(new Schema(
-                                    new Field.Int32(RELATIVE_OFFSET_KEY_NAME, "The relative offset of the record " +
+                            new Field(RECORD_ERRORS_KEY_NAME, new ArrayOf(new Schema(
+                                    new Field.Int32(BATCH_INDEX_KEY_NAME, "The batch index of the record " +
                                             "that caused the batch to be dropped"),
-                                    RELATIVE_OFFSET_ERROR_MESSAGE_FIELD
-                            )), "The relative offsets of records that caused the batch to be dropped"),
+                                    BATCH_INDEX_ERROR_MESSAGE_FIELD
+                            )), "The batch indices of records that caused the batch to be dropped"),
                             ERROR_MESSAGE_FIELD)))))),
             THROTTLE_TIME_MS);
 
@@ -225,19 +225,24 @@ public class ProduceResponse extends AbstractResponse {
                 long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
                 long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
 
-                Map<Integer, String> errorRecords = new HashMap<>();
-                if (partRespStruct.hasField(ERROR_RECORDS_KEY_NAME)) {
-                    for (Object recordOffsetAndMessage : partRespStruct.getArray(ERROR_RECORDS_KEY_NAME)) {
-                        Struct recordOffsetAndMessageStruct = (Struct) recordOffsetAndMessage;
-                        Integer relativeOffset = recordOffsetAndMessageStruct.getInt(RELATIVE_OFFSET_KEY_NAME);
-                        String relativeOffsetErrorMessage = recordOffsetAndMessageStruct.getOrElse(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, "");
-                        errorRecords.put(relativeOffset, relativeOffsetErrorMessage);
+                List<RecordError> recordErrors = Collections.emptyList();
+                if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
+                    Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
+                    if (recordErrorsArray.length > 0) {
+                        recordErrors = new ArrayList<>(recordErrorsArray.length);
+                        for (Object indexAndMessage : recordErrorsArray) {
+                            Struct indexAndMessageStruct = (Struct) indexAndMessage;
+                            recordErrors.add(new RecordError(
+                                    indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
+                                    indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
+                            ));
+                        }
                     }
                 }
 
-                String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, "");
+                String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
                 TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, errorRecords, errorMessage));
+                responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage));
             }
         }
         this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
@@ -266,19 +271,21 @@ public class ProduceResponse extends AbstractResponse {
                         .set(PARTITION_ID, partitionEntry.getKey())
                         .set(ERROR_CODE, errorCode)
                         .set(BASE_OFFSET_KEY_NAME, part.baseOffset);
-                if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME))
-                    partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
+                partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
                 partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);
 
-                List<Struct> errorRecords = new ArrayList<>();
-                for (Map.Entry<Integer, String> recordOffsetAndMessage : part.errorRecords.entrySet()) {
-                    Struct recordOffsetAndMessageStruct = partStruct.instance(ERROR_RECORDS_KEY_NAME)
-                            .set(RELATIVE_OFFSET_KEY_NAME, recordOffsetAndMessage.getKey())
-                            .setIfExists(RELATIVE_OFFSET_ERROR_MESSAGE_FIELD, recordOffsetAndMessage.getValue());
-                    errorRecords.add(recordOffsetAndMessageStruct);
+                List<Struct> recordErrors = Collections.emptyList();
+                if (!part.recordErrors.isEmpty()) {
+                    recordErrors = new ArrayList<>();
+                    for (RecordError indexAndMessage : part.recordErrors) {
+                        Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME)
+                                .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex)
+                                .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message);
+                        recordErrors.add(indexAndMessageStruct);
+                    }
                 }
 
-                partStruct.setIfExists(ERROR_RECORDS_KEY_NAME, errorRecords.toArray());
+                partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray());
 
                 partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
                 partitionArray.add(partStruct);
@@ -314,7 +321,7 @@ public class ProduceResponse extends AbstractResponse {
         public long baseOffset;
         public long logAppendTime;
         public long logStartOffset;
-        public Map<Integer, String> errorRecords;
+        public List<RecordError> recordErrors;
         public String errorMessage;
 
         public PartitionResponse(Errors error) {
@@ -322,19 +329,19 @@ public class ProduceResponse extends AbstractResponse {
         }
 
         public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) {
-            this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyMap(), null);
+            this(error, baseOffset, logAppendTime, logStartOffset, Collections.emptyList(), null);
         }
 
-        public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords) {
-            this(error, baseOffset, logAppendTime, logStartOffset, errorRecords, "");
+        public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors) {
+            this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, null);
         }
 
-        public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, Map<Integer, String> errorRecords, String errorMessage) {
+        public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
             this.error = error;
             this.baseOffset = baseOffset;
             this.logAppendTime = logAppendTime;
             this.logStartOffset = logStartOffset;
-            this.errorRecords = errorRecords;
+            this.recordErrors = recordErrors;
             this.errorMessage = errorMessage;
         }
 
@@ -350,15 +357,35 @@ public class ProduceResponse extends AbstractResponse {
             b.append(logAppendTime);
             b.append(", logStartOffset: ");
             b.append(logStartOffset);
-            b.append(", errorRecords: ");
-            b.append(errorRecords);
+            b.append(", recordErrors: ");
+            b.append(recordErrors);
             b.append(", errorMessage: ");
-            b.append(errorMessage);
+            if (errorMessage != null) {
+                b.append(errorMessage);
+            } else {
+                b.append("null");
+            }
             b.append('}');
             return b.toString();
         }
     }
 
+    public static final class RecordError {
+        public final int batchIndex;
+        public final String message;
+
+        public RecordError(int batchIndex, String message) {
+            this.batchIndex = batchIndex;
+            this.message = message;
+        }
+
+        public RecordError(int batchIndex) {
+            this.batchIndex = batchIndex;
+            this.message = null;
+        }
+
+    }
+
     public static ProduceResponse parse(ByteBuffer buffer, short version) {
         return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
     }
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json
index 4fcedf1..a872da7 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -29,7 +29,7 @@
   //
   // Starting in version 7, records can be produced using ZStandard compression.  See KIP-110.
   //
-  // Version 8 is the same as version 7 (but see KIP-467 for the response changes).
+  // Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467.
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json
index c21b0aa..77ab065 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -28,7 +28,7 @@
   // Version 5 added LogStartOffset to filter out spurious
   // OutOfOrderSequenceExceptions on the client.
   //
-  // Version 8 added ErrorRecords and ErrorMessage to include information about
+  // Version 8 added RecordErrors and ErrorMessage to include information about
   // records that cause the whole batch to be dropped.  See KIP-467 for details.
   "validVersions": "0-8",
   "flexibleVersions": "none",
@@ -49,11 +49,11 @@
           "about": "The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended." },
         { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
           "about": "The log start offset." },
-        { "name": "ErrorRecords", "type": "[]RelativeOffsetAndErrorMessage", "versions": "8+", "ignorable": true,
-          "about": "The relative offsets of records that caused the batch to be dropped", "fields": [
-          { "name":  "RelativeOffset", "type": "int32", "versions":  "8+",
-            "about":  "The relative offset of the record that cause the batch to be dropped" },
-          { "name": "RelativeOffsetErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
+        { "name": "RecordErrors", "type": "[]BatchIndexAndErrorMessage", "versions": "8+", "ignorable": true,
+          "about": "The batch indices of records that caused the batch to be dropped", "fields": [
+          { "name":  "BatchIndex", "type": "int32", "versions":  "8+",
+            "about":  "The batch index of the record that cause the batch to be dropped" },
+          { "name": "BatchIndexErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+",
             "about":  "The error message of the record that caused the batch to be dropped"}
         ]},
         { "name":  "ErrorMessage", "type": "string", "default": "null", "versions": "8+", "nullableVersions": "8+", "ignorable":  true,
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 73bb2af..c7dedf9 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -516,8 +516,8 @@ public final class MessageTest {
         int throttleTimeMs = 1234;
         long logAppendTimeMs = 1234L;
         long logStartOffset = 1234L;
-        int relativeOffset = 0;
-        String relativeOffsetErrorMessage = "error message";
+        int batchIndex = 0;
+        String batchIndexErrorMessage = "error message";
         String errorMessage = "global error message";
 
         testAllMessageRoundTrips(new ProduceResponseData()
@@ -542,10 +542,10 @@ public final class MessageTest {
                                          .setBaseOffset(baseOffset)
                                          .setLogAppendTimeMs(logAppendTimeMs)
                                          .setLogStartOffset(logStartOffset)
-                                         .setErrorRecords(singletonList(
-                                             new ProduceResponseData.RelativeOffsetAndErrorMessage()
-                                                 .setRelativeOffset(relativeOffset)
-                                                 .setRelativeOffsetErrorMessage(relativeOffsetErrorMessage)))
+                                         .setRecordErrors(singletonList(
+                                             new ProduceResponseData.BatchIndexAndErrorMessage()
+                                                 .setBatchIndex(batchIndex)
+                                                 .setBatchIndexErrorMessage(batchIndexErrorMessage)))
                                          .setErrorMessage(errorMessage)))))
                       .setThrottleTimeMs(throttleTimeMs);
 
@@ -553,7 +553,7 @@ public final class MessageTest {
             ProduceResponseData responseData = response.get();
 
             if (version < 8) {
-                responseData.responses().get(0).partitions().get(0).setErrorRecords(Collections.emptyList());
+                responseData.responses().get(0).partitions().get(0).setRecordErrors(Collections.emptyList());
                 responseData.responses().get(0).partitions().get(0).setErrorMessage(null);
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 32b0a00..c8bd751 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1223,7 +1223,8 @@ public class RequestResponseTest {
     private ProduceResponse createProduceResponseWithErrorMessage() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonMap(0, "error message"), "global error message"));
+                10000, RecordBatch.NO_TIMESTAMP, 100, Collections.singletonList(new ProduceResponse.RecordError(0, "error message")),
+                "global error message"));
         return new ProduceResponse(responseData, 0);
     }
 
diff --git a/core/src/main/scala/kafka/common/RecordValidationException.scala b/core/src/main/scala/kafka/common/RecordValidationException.scala
new file mode 100644
index 0000000..2acdf84
--- /dev/null
+++ b/core/src/main/scala/kafka/common/RecordValidationException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.requests.ProduceResponse.RecordError
+
+class RecordValidationException(val invalidException: ApiException,
+                                val recordErrors: List[RecordError]) extends RuntimeException {
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b0af105..94997a1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.requests.ProduceResponse.RecordError
 import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
@@ -54,7 +55,18 @@ object LogAppendInfo {
 
   def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo =
     LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
-      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L)
+      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1,
+      offsetsMonotonic = false, -1L)
+
+  /**
+   * In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467).
+   * For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one
+   * in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage
+   */
+  def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: List[RecordError], errorMessage: String): LogAppendInfo =
+    LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+      RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1,
+      offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
 /**
@@ -87,7 +99,9 @@ case class LogAppendInfo(var firstOffset: Option[Long],
                          shallowCount: Int,
                          validBytes: Int,
                          offsetsMonotonic: Boolean,
-                         lastOffsetOfFirstBatch: Long) {
+                         lastOffsetOfFirstBatch: Long,
+                         recordErrors: List[RecordError] = List(),
+                         errorMessage: String = null) {
   /**
    * Get the first offset if it exists, else get the last offset of the first batch
    * For magic versions 2 and newer, this method will return first offset. For magic versions
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 6bba8eb..70bf3bf 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.nio.ByteBuffer
 
 import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
-import kafka.common.LongRef
+import kafka.common.{LongRef, RecordValidationException}
 import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
 import kafka.server.BrokerTopicStats
 import kafka.utils.Logging
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampE
 import org.apache.kafka.common.record.{AbstractRecords, BufferSupplier, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
 import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.ProduceResponse.RecordError
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Seq, mutable}
@@ -146,11 +147,14 @@ private[kafka] object LogValidator extends Logging {
       throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic")
   }
 
-  private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, now: Long, timestampType: TimestampType,
-                             timestampDiffMaxMs: Long, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats): Unit = {
+  private def validateRecord(batch: RecordBatch, topicPartition: TopicPartition, record: Record, batchIndex: Int, now: Long,
+                             timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean,
+                             brokerTopicStats: BrokerTopicStats): Unit = {
     if (!record.hasMagic(batch.magic)) {
       brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
-      throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition.")
+      throw new RecordValidationException(
+        new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic} in topic partition $topicPartition."),
+        List(new RecordError(batchIndex)))
     }
 
     // verify the record-level CRC only if this is one of the deep entries of a compressed message
@@ -167,8 +171,8 @@ private[kafka] object LogValidator extends Logging {
       }
     }
 
-    validateKey(record, topicPartition, compactedTopic, brokerTopicStats)
-    validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)
+    validateKey(record, batchIndex, topicPartition, compactedTopic, brokerTopicStats)
+    validateTimestamp(batch, record, batchIndex, now, timestampType, timestampDiffMaxMs)
   }
 
   private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
@@ -201,8 +205,8 @@ private[kafka] object LogValidator extends Logging {
     for (batch <- records.batches.asScala) {
       validateBatch(topicPartition, firstBatch, batch, isFromClient, toMagicValue, brokerTopicStats)
 
-      for (record <- batch.asScala) {
-        validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
+      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+        validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
         builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
       }
     }
@@ -232,6 +236,7 @@ private[kafka] object LogValidator extends Logging {
                                          magic: Byte,
                                          brokerTopicStats: BrokerTopicStats): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
+    val expectedInnerOffset = new LongRef(0)
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
 
@@ -243,8 +248,19 @@ private[kafka] object LogValidator extends Logging {
       var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
       var offsetOfMaxBatchTimestamp = -1L
 
-      for (record <- batch.asScala) {
-        validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
+      for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+        validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
+
+        val expectedOffset = expectedInnerOffset.getAndIncrement()
+
+        // inner records offset should always be continuous
+        if (record.offset != expectedOffset) {
+          brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
+          throw new RecordValidationException(
+            new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
+            List(new RecordError(batchIndex)))
+        }
+
         val offset = offsetCounter.getAndIncrement()
         if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
           maxBatchTimestamp = record.timestamp
@@ -349,11 +365,13 @@ private[kafka] object LogValidator extends Logging {
         batch.streamingIterator(BufferSupplier.NO_CACHING)
 
       try {
-        for (record <- batch.asScala) {
+        for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
           if (sourceCodec != NoCompressionCodec && record.isCompressed)
-            throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
-              s"compression attribute set: $record")
-          validateRecord(batch, topicPartition, record, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
+            throw new RecordValidationException(
+              new InvalidRecordException(s"Compressed outer record should not have an inner record with a compression attribute set: $record"),
+              List(new RecordError(batchIndex)))
+
+          validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats)
 
           uncompressedSizeInBytes += record.sizeInBytes()
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
@@ -361,7 +379,9 @@ private[kafka] object LogValidator extends Logging {
             val expectedOffset = expectedInnerOffset.getAndIncrement()
             if (record.offset != expectedOffset) {
               brokerTopicStats.allTopicsStats.invalidOffsetOrSequenceRecordsPerSec.mark()
-              throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition.")
+              throw new RecordValidationException(
+                new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset in topic partition $topicPartition."),
+                List(new RecordError(batchIndex)))
             }
             if (record.timestamp > maxTimestamp)
               maxTimestamp = record.timestamp
@@ -456,10 +476,12 @@ private[kafka] object LogValidator extends Logging {
       recordConversionStats = recordConversionStats)
   }
 
-  private def validateKey(record: Record, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) {
+  private def validateKey(record: Record, batchIndex: Int, topicPartition: TopicPartition, compactedTopic: Boolean, brokerTopicStats: BrokerTopicStats) {
     if (compactedTopic && !record.hasKey) {
       brokerTopicStats.allTopicsStats.noKeyCompactedTopicRecordsPerSec.mark()
-      throw new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition.")
+      throw new RecordValidationException(
+        new InvalidRecordException(s"Compacted topic cannot accept message without key in topic partition $topicPartition."),
+        List(new RecordError(batchIndex)))
     }
   }
 
@@ -469,17 +491,22 @@ private[kafka] object LogValidator extends Logging {
    */
   private def validateTimestamp(batch: RecordBatch,
                                 record: Record,
+                                batchIndex: Int,
                                 now: Long,
                                 timestampType: TimestampType,
                                 timestampDiffMaxMs: Long): Unit = {
     if (timestampType == TimestampType.CREATE_TIME
       && record.timestamp != RecordBatch.NO_TIMESTAMP
       && math.abs(record.timestamp - now) > timestampDiffMaxMs)
-      throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
-        s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
+      throw new RecordValidationException(
+        new InvalidTimestampException(s"Timestamp ${record.timestamp} of message with offset ${record.offset} is " +
+          s"out of range. The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]"),
+        List(new RecordError(batchIndex)))
     if (batch.timestampType == TimestampType.LOG_APPEND_TIME)
-      throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
-        s"timestamp type to LogAppendTime.")
+      throw new RecordValidationException(
+        new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
+          s"timestamp type to LogAppendTime."),
+        List(new RecordError(batchIndex)))
   }
 
   case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8a74aa7..d10c936 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.Lock
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition}
+import kafka.common.RecordValidationException
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
 import kafka.metrics.KafkaMetricsGroup
@@ -32,9 +33,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.ElectionType
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{ElectionType, Node, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -499,7 +498,8 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
+                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
+                    result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
       }
 
       recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
@@ -753,6 +753,19 @@ class ReplicaManager(val config: KafkaConfig,
                                isFromClient: Boolean,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
+
+    def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
+      val logStartOffset = getPartition(topicPartition) match {
+        case HostedPartition.Online(partition) => partition.logStartOffset
+        case HostedPartition.None | HostedPartition.Offline => -1L
+      }
+      brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
+      brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
+      error(s"Error processing append operation on partition $topicPartition", t)
+
+      logStartOffset
+    }
+
     trace(s"Append [$entriesPerPartition] to local log")
     entriesPerPartition.map { case (topicPartition, records) =>
       brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
@@ -786,17 +799,15 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
-                   _: KafkaStorageException |
-                   _: InvalidTimestampException) =>
+                   _: KafkaStorageException) =>
             (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
+          case rve: RecordValidationException =>
+            val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
+            val recordErrors = rve.recordErrors
+            (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
+              logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
           case t: Throwable =>
-            val logStartOffset = getPartition(topicPartition) match {
-              case HostedPartition.Online(partition) => partition.logStartOffset
-              case HostedPartition.None | HostedPartition.Offline => -1L
-            }
-            brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
-            brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
-            error(s"Error processing append operation on partition $topicPartition", t)
+            val logStartOffset = processFailedRecord(topicPartition, t)
             (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
         }
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 29b564e..c5de28e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -25,13 +25,13 @@ import java.util.{Collections, Optional, Properties}
 
 import com.yammer.metrics.Metrics
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
-import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
+import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException}
 import kafka.log.Log.DeleteDirSuffix
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -1850,19 +1850,19 @@ class LogTest {
       log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case _: InvalidRecordException => // this is good
+      case _: RecordValidationException => // this is good
     }
     try {
       log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case _: InvalidRecordException => // this is good
+      case _: RecordValidationException => // this is good
     }
     try {
       log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case _: InvalidRecordException => // this is good
+      case _: RecordValidationException => // this is good
     }
 
     // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 7fd1321..923ae91 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.Metrics
 import kafka.api.{ApiVersion, KAFKA_2_0_IV1, KAFKA_2_3_IV1}
-import kafka.common.LongRef
+import kafka.common.{LongRef, RecordValidationException}
 import kafka.message._
 import kafka.server.BrokerTopicStats
 import kafka.utils.TestUtils.meterCount
@@ -81,7 +81,7 @@ class LogValidatorTest {
   }
 
   private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compressionType: CompressionType): Unit = {
-    assertThrows[InvalidRecordException] {
+    assertThrows[RecordValidationException] {
       validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compressionType), batchMagic, compressionType, compressionType)
     }
     assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMagicNumberRecordsPerSec}")), 1)
@@ -574,7 +574,7 @@ class LogValidatorTest {
     checkCompressed(RecordBatch.MAGIC_VALUE_V2)
   }
 
-  @Test(expected = classOf[InvalidTimestampException])
+  @Test(expected = classOf[RecordValidationException])
   def testInvalidCreateTimeNonCompressedV1(): Unit = {
     val now = System.currentTimeMillis()
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
@@ -597,7 +597,7 @@ class LogValidatorTest {
       brokerTopicStats = brokerTopicStats)
   }
 
-  @Test(expected = classOf[InvalidTimestampException])
+  @Test(expected = classOf[RecordValidationException])
   def testInvalidCreateTimeNonCompressedV2(): Unit = {
     val now = System.currentTimeMillis()
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
@@ -620,7 +620,7 @@ class LogValidatorTest {
       brokerTopicStats = brokerTopicStats)
   }
 
-  @Test(expected = classOf[InvalidTimestampException])
+  @Test(expected = classOf[RecordValidationException])
   def testInvalidCreateTimeCompressedV1(): Unit = {
     val now = System.currentTimeMillis()
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V1, timestamp = now - 1001L,
@@ -643,7 +643,7 @@ class LogValidatorTest {
       brokerTopicStats = brokerTopicStats)
   }
 
-  @Test(expected = classOf[InvalidTimestampException])
+  @Test(expected = classOf[RecordValidationException])
   def testInvalidCreateTimeCompressedV2(): Unit = {
     val now = System.currentTimeMillis()
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
@@ -1250,6 +1250,52 @@ class LogValidatorTest {
     testBatchWithoutRecordsNotAllowed(NoCompressionCodec, DefaultCompressionCodec)
   }
 
+  @Test
+  def testInvalidTimestampExceptionHasBatchIndex(): Unit = {
+    val now = System.currentTimeMillis()
+    val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, timestamp = now - 1001L,
+      codec = CompressionType.GZIP)
+    val e = intercept[RecordValidationException] {
+      LogValidator.validateMessagesAndAssignOffsets(
+        records,
+        topicPartition,
+        offsetCounter = new LongRef(0),
+        time = time,
+        now = System.currentTimeMillis(),
+        sourceCodec = DefaultCompressionCodec,
+        targetCodec = DefaultCompressionCodec,
+        magic = RecordBatch.MAGIC_VALUE_V1,
+        compactedTopic = false,
+        timestampType = TimestampType.CREATE_TIME,
+        timestampDiffMaxMs = 1000L,
+        partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+        isFromClient = true,
+        interBrokerProtocolVersion = ApiVersion.latestVersion,
+        brokerTopicStats = brokerTopicStats)
+    }
+
+    assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
+    assertTrue(e.recordErrors.nonEmpty)
+    assertEquals(e.recordErrors.size, 1)
+    assertEquals(e.recordErrors.head.batchIndex, 0)
+    assertNull(e.recordErrors.head.message)
+  }
+
+  @Test
+  def testInvalidRecordExceptionHasBatchIndex(): Unit = {
+    val e = intercept[RecordValidationException] {
+      validateMessages(recordsWithInvalidInnerMagic(
+        RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP),
+        RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
+    }
+
+    assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
+    assertTrue(e.recordErrors.nonEmpty)
+    assertEquals(e.recordErrors.size, 1)
+    assertEquals(e.recordErrors.head.batchIndex, 0)
+    assertNull(e.recordErrors.head.message)
+  }
+
   private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = {
     val offset = 1234567
     val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index bedf6ff..3bc8d0a 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.nio.ByteBuffer
 import java.util.Properties
 
 import com.yammer.metrics.Metrics
@@ -56,7 +57,7 @@ class ProduceRequestTest extends BaseRequestTest {
       assertEquals(Errors.NONE, partitionResponse.error)
       assertEquals(expectedOffset, partitionResponse.baseOffset)
       assertEquals(-1, partitionResponse.logAppendTime)
-      assertTrue(partitionResponse.errorRecords.isEmpty)
+      assertTrue(partitionResponse.recordErrors.isEmpty)
       partitionResponse
     }
 
@@ -69,6 +70,39 @@ class ProduceRequestTest extends BaseRequestTest {
   }
 
   @Test
+  def testProduceWithInvalidTimestamp(): Unit = {
+    val topic = "topic"
+    val partition = 0
+    val topicConfig = new Properties
+    topicConfig.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig)
+    val leader = partitionToLeader(partition)
+
+    def createRecords(magicValue: Byte,
+                      timestamp: Long = RecordBatch.NO_TIMESTAMP,
+                      codec: CompressionType): MemoryRecords = {
+      val buf = ByteBuffer.allocate(512)
+      val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
+      builder.appendWithOffset(0, timestamp, null, "hello".getBytes)
+      builder.appendWithOffset(1, timestamp, null, "there".getBytes)
+      builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes)
+      builder.build()
+    }
+
+    val records = createRecords(RecordBatch.MAGIC_VALUE_V2, System.currentTimeMillis() - 1001L, CompressionType.GZIP)
+    val topicPartition = new TopicPartition("topic", partition)
+    val partitionRecords = Map(topicPartition -> records)
+    val produceResponse = sendProduceRequest(leader, ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
+    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+    assertEquals(topicPartition, tp)
+    assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
+    assertEquals(1, partitionResponse.recordErrors.size())
+    assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex)
+    assertNull(partitionResponse.recordErrors.get(0).message)
+    assertNotNull(partitionResponse.errorMessage)
+  }
+
+  @Test
   def testProduceToNonReplica(): Unit = {
     val topic = "topic"
     val partition = 0


Mime
View raw message