kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5235)
Date Fri, 15 Jun 2018 20:28:20 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new ce4f4e1  KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset
(#5235)
ce4f4e1 is described below

commit ce4f4e1ab9ab800f7ea962570cc29e5b00e12f23
Author: Anna Povzner <anna@confluent.io>
AuthorDate: Fri Jun 15 13:26:32 2018 -0700

    KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5235)
    
    It is possible that log start offset may fall in the middle of the batch after AdminClient#deleteRecords().
This will cause a follower starting from log start offset to fail fetching (all records).
Use-cases when a follower will start fetching from log start offset includes: 1) new replica
due to partition re-assignment; 2) new local replica created as a result of AdminClient#AlterReplicaLogDirs();
3) broker that was down for some time while AdminClient#deleteRecords() move log start  [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>, Jason
Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  32 +++-
 .../kafka/common/OffsetsOutOfOrderException.scala  |  25 +++
 .../common/UnexpectedAppendOffsetException.scala   |  29 ++++
 core/src/main/scala/kafka/log/Log.scala            |  24 ++-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   3 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 173 +++++++++++++++++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  70 ++++++++-
 7 files changed, 346 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e038b58..55edd10 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -29,7 +30,7 @@ import kafka.server._
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException,
PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException,
NotLeaderForPartitionException, PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -155,6 +156,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+    getReplica(replicaId).getOrElse(
+      throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition
$topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
     leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -486,6 +491,31 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
+  def appendRecordsToFollower(records: MemoryRecords) {
+    try {
+      getReplicaOrException().log.get.appendAsFollower(records)
+    } catch {
+      case e: UnexpectedAppendOffsetException =>
+        val replica = getReplicaOrException()
+        val logEndOffset = replica.logEndOffset.messageOffset
+        if (logEndOffset == replica.logStartOffset &&
+            e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+          // This may happen if the log start offset on the leader (or current replica) falls
in
+          // the middle of the batch due to delete records request and the follower tries
to
+          // fetch its first offset from the leader.
+          // We handle this case here instead of Log#append() because we will need to remove
the
+          // segment that start with log start offset and create a new one with earlier offset
+          // (base offset of the batch), which will move recoveryPoint backwards, so we will
need
+          // to checkpoint the new recovery point before we append
+          info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset}
is less than log start offset ${replica.logStartOffset}." +
+               s" Since this is the first record to be appended to the follower's log, will
start the log from offset ${e.firstOffset}.")
+          logManager.truncateFullyAndStartAt(topicPartition, e.firstOffset)
+          replica.log.get.appendAsFollower(records)
+        } else
+          throw e
+    }
+  }
+
   def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks:
Int = 0): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal match {
diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
new file mode 100644
index 0000000..f8daaa4
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower received records with non-monotonically increasing offsets
+ */
+class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) {
+}
+
diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
new file mode 100644
index 0000000..e719a93
--- /dev/null
+++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the follower or the future replica received records from the leader (or current
+ * replica) with first offset less than expected next offset. 
+ * @param firstOffset The first offset of the records to append
+ * @param lastOffset  The last offset of the records to append
+ */
+class UnexpectedAppendOffsetException(val message: String,
+                                      val firstOffset: Long,
+                                      val lastOffset: Long) extends RuntimeException(message)
{
+}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index caa7bf5..9157ee1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
 
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.common.{InvalidOffsetException, KafkaException, LongRef}
+import kafka.common.{InvalidOffsetException, KafkaException, LongRef, UnexpectedAppendOffsetException,
OffsetsOutOfOrderException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
 import kafka.utils._
@@ -618,6 +618,8 @@ class Log(@volatile var dir: File,
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply
what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to messages when
offsets are assigned on the leader
    * @throws KafkaStorageException If the append fails due to an I/O error.
+   * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
+   * @throws UnexpectedAppendOffsetException If the first or last offset in append is less
than next offset
    * @return Information about the appended messages including the first and last offset.
    */
   private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo = {
@@ -679,8 +681,24 @@ class Log(@volatile var dir: File,
           }
         } else {
           // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
-            throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
+          if (!appendInfo.offsetsMonotonic)
+            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to
$topicPartition: " +
+                                                 records.records.asScala.map(_.offset))
+
+          if (appendInfo.firstOffset < nextOffsetMetadata.messageOffset) {
+            // we may still be able to recover if the log is empty
+            // one example: fetching from log start offset on the leader which is not batch
aligned,
+            // which may happen as a result of AdminClient#deleteRecords()
+            // appendInfo.firstOffset maybe either first offset or last offset of the first
batch.
+            // get the actual first offset, which may require decompressing the data
+            val firstOffset = records.batches.asScala.head.baseOffset()
+            throw new UnexpectedAppendOffsetException(
+              s"Unexpected offset in append to $topicPartition. First offset or last offset
of the first batch " +
+              s"${appendInfo.firstOffset} is less than the next offset ${nextOffsetMetadata.messageOffset}.
" +
+              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)},
last offset in" +
+              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
+              firstOffset, appendInfo.lastOffset)
+          }
         }
 
         // update the epoch cache with the epoch stamped onto the message by the leader
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 2fb04486..af5763a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -87,6 +87,7 @@ class ReplicaFetcherThread(name: String,
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PartitionData) {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
+    val partition = replicaMgr.getPartition(topicPartition).get
     val records = partitionData.toRecords
 
     maybeWarnIfOversizedRecords(records, topicPartition)
@@ -99,7 +100,7 @@ class ReplicaFetcherThread(name: String,
         .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes,
partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    replica.log.get.appendAsFollower(records)
+    partition.appendRecordsToFollower(records)
 
     if (logger.isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages
for partition %s"
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
new file mode 100644
index 0000000..2798b5a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.UnexpectedAppendOffsetException
+import kafka.log.{LogConfig, LogManager, CleanerConfig}
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record._
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.Assertions.assertThrows
+import scala.collection.JavaConverters._
+
+class PartitionTest {
+
+  val brokerId = 101
+  val topicPartition = new TopicPartition("test-topic", 0)
+  val time = new MockTime()
+  val brokerTopicStats = new BrokerTopicStats
+  val metrics = new Metrics
+
+  var tmpDir: File = _
+  var logDir: File = _
+  var replicaManager: ReplicaManager = _
+  var logManager: LogManager = _
+  var logConfig: LogConfig = _
+
+  @Before
+  def setup(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    logConfig = LogConfig(logProps)
+
+    tmpDir = TestUtils.tempDir()
+    logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false),
time)
+    logManager.startup()
+
+    val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
+    brokerProps.put("log.dir", logDir.getAbsolutePath)
+    val brokerConfig = KafkaConfig.fromProps(brokerProps)
+    replicaManager = new ReplicaManager(
+      config = brokerConfig, metrics, time, zkUtils = null, new MockScheduler(time),
+      logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics,
time, "").follower,
+      brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size))
+  }
+
+  @After
+  def tearDown(): Unit = {
+    brokerTopicStats.close()
+    metrics.close()
+
+    logManager.shutdown()
+    Utils.delete(tmpDir)
+    logManager.liveLogDirs.foreach(Utils.delete)
+    replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(Some(replica), partition.getReplica(replica.brokerId))
+
+    val initialLogStartOffset = 5L
+    logManager.truncateFullyAndStartAt(topicPartition, initialLogStartOffset)
+    assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:",
+                 initialLogStartOffset, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:",
+                 initialLogStartOffset, replica.logStartOffset)
+
+    // verify that we cannot append records that do not contain log start offset even if
the log is empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      // append one record with offset = 3
+      partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes,
"v1".getBytes)), baseOffset = 3L))
+    }
+    assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset,
replica.logEndOffset.messageOffset)
+
+    // verify that we can append records that contain log start offset, even when first
+    // offset < log start offset if the log is empty
+    val newLogStartOffset = 4L
+    val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                     new SimpleRecord("k2".getBytes, "v2".getBytes),
+                                     new SimpleRecord("k3".getBytes, "v3".getBytes)),
+                                baseOffset = newLogStartOffset)
+    partition.appendRecordsToFollower(records)
+    assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:",
7L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:",
newLogStartOffset, replica.logStartOffset)
+
+    // and we can append more records after that
+    partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes,
"v1".getBytes)), baseOffset = 7L))
+    assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+
+    // but we cannot append to offset < log start if the log is not empty
+    assertThrows[UnexpectedAppendOffsetException] {
+      val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                        new SimpleRecord("k2".getBytes, "v2".getBytes)),
+                                   baseOffset = 3L)
+      partition.appendRecordsToFollower(records2)
+    }
+    assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset)
+
+    // we still can append to next offset
+    partition.appendRecordsToFollower(createRecords(List(new SimpleRecord("k1".getBytes,
"v1".getBytes)), baseOffset = 8L))
+    assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset)
+    assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset)
+  }
+
+  @Test
+  def testGetReplica(): Unit = {
+    val log = logManager.getOrCreateLog(topicPartition, logConfig)
+    val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
+    val partition = new
+        Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    assertEquals(None, partition.getReplica(brokerId))
+    assertThrows[ReplicaNotAvailableException] {
+      partition.getReplicaOrException(brokerId)
+    }
+
+    partition.addReplicaIfNotExists(replica)
+    assertEquals(replica, partition.getReplicaOrException(brokerId))
+  }
+
+  @Test
+  def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+    assertThrows[ReplicaNotAvailableException] {
+      partition.appendRecordsToFollower(
+           createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset
= 0L))
+    }
+  }
+
+  def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch:
Int = 0): MemoryRecords = {
+    val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(
+      buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME,
+      baseOffset, time.milliseconds, partitionLeaderEpoch)
+    records.foreach(builder.append)
+    builder.build()
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3c8b01e..41f6ab7 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
-import kafka.common.KafkaException
+import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
@@ -39,6 +39,7 @@ import org.easymock.EasyMock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import org.scalatest.Assertions.{assertThrows, intercept, withClue}
 
 class LogTest {
 
@@ -1862,13 +1863,72 @@ class LogTest {
     assertTrue("Message payload should be null.", !head.hasValue)
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test
   def testAppendWithOutOfOrderOffsetsThrowsException() {
-    val log = createLog(logDir, LogConfig())
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+
+    val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
+    val buffer = ByteBuffer.allocate(512)
+    for (offset <- appendOffsets) {
+      val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
+                                          TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(),
+                                          1L, 0, 0, false, 0)
+      builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
+      builder.close()
+    }
+    buffer.flip()
+    val memoryRecords = MemoryRecords.readableRecords(buffer)
+
+    assertThrows[OffsetsOutOfOrderException] {
+      log.appendAsFollower(memoryRecords)
+    }
+  }
+
+  @Test
+  def testAppendBelowExpectedOffsetThrowsException() {
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
record), leaderEpoch = 0))
-    val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
-    log.appendAsFollower(invalidRecord)
+
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes))
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        assertThrows[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(invalidRecord)
+        }
+      }
+    }
+  }
+
+  @Test
+  def testAppendEmptyLogBelowLogStartOffsetThrowsException() {
+    createEmptyLogs(logDir, 7)
+    val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats)
+    assertEquals(7L, log.logStartOffset)
+    assertEquals(7L, log.logEndOffset)
+
+    val firstOffset = 4L
+    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
+    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
+    for (magic <- magicVals; compression <- compressionTypes) {
+      val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
+                                         new SimpleRecord("k2".getBytes, "v2".getBytes),
+                                         new SimpleRecord("k3".getBytes, "v3".getBytes)),
+                                    magicValue = magic, codec = compression,
+                                    baseOffset = firstOffset)
+
+      withClue(s"Magic=$magic, compressionType=$compression") {
+        val exception = intercept[UnexpectedAppendOffsetException] {
+          log.appendAsFollower(records = batch)
+        }
+        assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset",
+                     firstOffset, exception.firstOffset)
+        assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset",
+                     firstOffset + 2, exception.lastOffset)
+      }
+    }
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message