kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5634; Do not allow segment deletion beyond high watermark
Date Thu, 27 Jul 2017 20:22:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 25bcc2176 -> c2b939c62


KAFKA-5634; Do not allow segment deletion beyond high watermark

This patch changes the segment deletion behavior to take the high watermark of the partition
into account. In particular, segments containing offsets equal to or larger than the high
watermark are no longer eligible for deletion. This is needed to ensure that the log start
offset reported in fetch responses does not get ahead of the high watermark.

Impact: segment deletion may be delayed compared to existing behavior since the broker must
await advancement of the high watermark. For topics with heavy load, this may make the active
segment effectively ineligible for deletion since the high watermark may never catch up to
the log end offset.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Apurva Mehta <apurva@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #3575 from hachikuji/KAFKA-5634

(cherry picked from commit 6bd7302645a6167299de6de4e5a19749502db25c)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2b939c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2b939c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2b939c6

Branch: refs/heads/0.11.0
Commit: c2b939c623ed2b532f129da2233ee8abe83cf94b
Parents: 25bcc21
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Jul 27 12:53:26 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jul 27 13:22:11 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |   4 +-
 core/src/main/scala/kafka/cluster/Replica.scala |  30 ++--
 core/src/main/scala/kafka/log/Log.scala         | 103 +++++++++-----
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../scala/unit/kafka/cluster/ReplicaTest.scala  | 138 +++++++++++++++++++
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +
 .../scala/unit/kafka/log/LogManagerTest.scala   |   2 +
 .../src/test/scala/unit/kafka/log/LogTest.scala |  49 ++++++-
 .../server/HighwatermarkPersistenceTest.scala   |  21 +--
 .../unit/kafka/server/ISRExpirationTest.scala   |   8 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   1 +
 .../kafka/server/ReplicaManagerQuotasTest.scala |   4 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   4 +-
 ...rivenReplicationProtocolAcceptanceTest.scala |   6 +-
 .../epoch/OffsetsForLeaderEpochTest.scala       |   2 +-
 15 files changed, 297 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ebf3140..b255691 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -125,8 +125,8 @@ class Partition(val topic: String,
         if (!offsetMap.contains(topicPartition))
           info(s"No checkpointed highwatermark is found for partition $topicPartition")
         val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
-        new Replica(replicaId, this, time, offset, Some(log))
-      } else new Replica(replicaId, this, time)
+        new Replica(replicaId, topicPartition, time, offset, Some(log))
+      } else new Replica(replicaId, topicPartition, time)
     })
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8f08089..2cd5c0c 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,13 +21,14 @@ import kafka.log.Log
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
 import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import org.apache.kafka.common.utils.Time
 
 class Replica(val brokerId: Int,
-              val partition: Partition,
+              val topicPartition: TopicPartition,
               time: Time = Time.SYSTEM,
               initialHighWatermarkValue: Long = 0L,
               val log: Option[Log] = None) extends Logging {
@@ -52,14 +53,15 @@ class Replica(val brokerId: Int,
   // the LEO of leader at time t. This is used to determine the lag of this follower and
ISR of this partition.
   @volatile private[this] var _lastCaughtUpTimeMs = 0L
 
-  val topicPartition = partition.topicPartition
-
   def isLocal: Boolean = log.isDefined
 
   def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
 
   val epochs = log.map(_.leaderEpochCache)
 
+  info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue")
+  log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
+
   /*
    * If the FetchRequest reads up to the log end offset of the leader when the current fetch
request is received,
    * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
@@ -99,18 +101,22 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def logEndOffset =
+  def logEndOffset: LogOffsetMetadata =
     if (isLocal)
       log.get.logEndOffsetMetadata
     else
       logEndOffsetMetadata
 
-  def maybeIncrementLogStartOffset(offset: Long) {
+  /**
+   * Increment the log start offset if the new offset is greater than the previous log start
offset. The replica
+   * must be local and the new log start offset must be lower than the current high watermark.
+   */
+  def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
     if (isLocal) {
-      if (highWatermark.messageOffset < offset)
-        throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than
the high watermark" +
-                                            s" ${highWatermark.messageOffset} of the partition
$topicPartition")
-      log.get.maybeIncrementLogStartOffset(offset)
+      if (newLogStartOffset > highWatermark.messageOffset)
+        throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset
of partition $topicPartition " +
+          s"since it is larger than the high watermark ${highWatermark.messageOffset}")
+      log.get.maybeIncrementLogStartOffset(newLogStartOffset)
     } else {
       throw new KafkaException(s"Should not try to delete records on partition $topicPartition's
non-local replica $brokerId")
     }
@@ -126,7 +132,7 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def logStartOffset =
+  def logStartOffset: Long =
     if (isLocal)
       log.get.logStartOffset
     else
@@ -179,8 +185,8 @@ class Replica(val brokerId: Int,
   override def toString: String = {
     val replicaString = new StringBuilder
     replicaString.append("ReplicaId: " + brokerId)
-    replicaString.append("; Topic: " + partition.topic)
-    replicaString.append("; Partition: " + partition.partitionId)
+    replicaString.append("; Topic: " + topicPartition.topic)
+    replicaString.append("; Partition: " + topicPartition.partition)
     replicaString.append("; isLocal: " + isLocal)
     replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
     if (isLocal) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 824d302..f8b5d82 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Seq, mutable}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -168,6 +168,13 @@ class Log(@volatile var dir: File,
    */
   @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
 
+  /* Keep track of the current high watermark in order to ensure that segments containing
offsets at or above it are
+   * not eligible for deletion. This means that the active segment is only eligible for deletion
if the high watermark
+   * equals the log end offset (which may never happen for a partition under consistent load).
This is needed to
+   * prevent the log start offset (which is exposed in fetch responses) from getting ahead
of the high watermark.
+   */
+  @volatile private var replicaHighWatermark: Option[Long] = None
+
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long,
LogSegment]
 
@@ -702,6 +709,7 @@ class Log(@volatile var dir: File,
 
   def onHighWatermarkIncremented(highWatermark: Long): Unit = {
     lock synchronized {
+      replicaHighWatermark = Some(highWatermark)
       producerStateManager.onHighWatermarkUpdated(highWatermark)
       updateFirstUnstableOffset()
     }
@@ -726,13 +734,14 @@ class Log(@volatile var dir: File,
   /**
    * Increment the log start offset if the provided offset is larger.
    */
-  def maybeIncrementLogStartOffset(offset: Long) {
+  def maybeIncrementLogStartOffset(newLogStartOffset: Long) {
     // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
     // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are
shutdown
     // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance
of this happening is low.
     lock synchronized {
-      if (offset > logStartOffset) {
-        logStartOffset = offset
+      if (newLogStartOffset > logStartOffset) {
+        info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset
in dir ${dir.getParent}")
+        logStartOffset = newLogStartOffset
         leaderEpochCache.clearAndFlushEarliest(logStartOffset)
         producerStateManager.truncateHead(logStartOffset)
         updateFirstUnstableOffset()
@@ -1059,12 +1068,15 @@ class Log(@volatile var dir: File,
    * Delete any log segments matching the given predicate function,
    * starting with the oldest segment and moving forward until a segment doesn't match.
    *
-   * @param predicate A function that takes in a single log segment and returns true iff
it is deletable
+   * @param predicate A function that takes in a candidate log segment and the next higher
segment
+   *                  (if there is one) and returns true iff it is deletable
    * @return The number of segments deleted
    */
-  private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
+  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: String): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
+      if (deletable.nonEmpty)
+        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}]
due to $reason")
       deleteSegments(deletable)
     }
   }
@@ -1085,36 +1097,63 @@ class Log(@volatile var dir: File,
   }
 
   /**
-    * Find segments starting from the oldest until the user-supplied predicate is false.
-    * A final segment that is empty will never be returned (since we would just end up re-creating
it).
-    * @param predicate A function that takes in a single log segment and returns true iff
it is deletable
-    * @return the segments ready to be deleted
-    */
-  private def deletableSegments(predicate: LogSegment => Boolean) = {
-    val lastEntry = segments.lastEntry
-    if (lastEntry == null) Seq.empty
-    else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset
|| s.size > 0))
+   * Find segments starting from the oldest until the user-supplied predicate is false or
the segment
+   * containing the current high watermark is reached. We do not delete segments with offsets
at or beyond
+   * the high watermark to ensure that the log start offset can never exceed it. If the high
watermark
+   * has not yet been initialized, no segments are eligible for deletion.
+   *
+   * A final segment that is empty will never be returned (since we would just end up re-creating
it).
+   *
+   * @param predicate A function that takes in a candidate log segment and the next higher
segment
+   *                  (if there is one) and returns true iff it is deletable
+   * @return the segments ready to be deleted
+   */
+  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean):
Iterable[LogSegment] = {
+    if (segments.isEmpty || replicaHighWatermark.isEmpty) {
+      Seq.empty
+    } else {
+      val highWatermark = replicaHighWatermark.get
+      val deletable = ArrayBuffer.empty[LogSegment]
+      var segmentEntry = segments.firstEntry
+      while (segmentEntry != null) {
+        val segment = segmentEntry.getValue
+        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
+        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry
!= null)
+          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
+        else
+          (null, logEndOffset, segment.size == 0)
+
+        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment))
&& !isLastSegmentAndEmpty) {
+          deletable += segment
+          segmentEntry = nextSegmentEntry
+        } else {
+          segmentEntry = null
+        }
+      }
+      deletable
+    }
   }
 
   /**
-    * Delete any log segments that have either expired due to time based retention
-    * or because the log size is > retentionSize
-    */
+   * Delete any log segments that have either expired due to time based retention
+   * or because the log size is > retentionSize
+   */
   def deleteOldSegments(): Int = {
     if (!config.delete) return 0
     deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
   }
 
-  private def deleteRetentionMsBreachedSegments() : Int = {
+  private def deleteRetentionMsBreachedSegments(): Int = {
     if (config.retentionMs < 0) return 0
     val startMs = time.milliseconds
-    deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
+    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
+      reason = s"retention time ${config.retentionMs}ms breach")
   }
 
-  private def deleteRetentionSizeBreachedSegments() : Int = {
+  private def deleteRetentionSizeBreachedSegments(): Int = {
     if (config.retentionSize < 0 || size < config.retentionSize) return 0
     var diff = size - config.retentionSize
-    def shouldDelete(segment: LogSegment) = {
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
       if (diff - segment.size >= 0) {
         diff -= segment.size
         true
@@ -1122,23 +1161,13 @@ class Log(@volatile var dir: File,
         false
       }
     }
-    deleteOldSegments(shouldDelete)
+    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize}
breach")
   }
 
-  private def deleteLogStartOffsetBreachedSegments() : Int = {
-    // keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
-    lock synchronized {
-      val deletable = {
-        if (segments.size() < 2)
-          Seq.empty
-        else
-          logSegments.sliding(2).takeWhile { iterable =>
-            val nextSegment = iterable.toSeq(1)
-            nextSegment.baseOffset <= logStartOffset
-          }.map(_.toSeq(0)).toSeq
-      }
-      deleteSegments(deletable)
-    }
+  private def deleteLogStartOffsetBreachedSegments(): Int = {
+    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
+      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 853b7c4..9b898e9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1113,7 +1113,7 @@ class ReplicaManager(val config: KafkaConfig,
     val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
     for ((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap
+      val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
       try {
         highWatermarkCheckpoints(dir).write(hwms)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
new file mode 100644
index 0000000..839b9d9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.cluster
+
+import java.util.Properties
+
+import kafka.log.{Log, LogConfig}
+import kafka.server.{BrokerTopicStats, LogOffsetMetadata}
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.utils.Utils
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+class ReplicaTest {
+
+  val tmpDir = TestUtils.tempDir()
+  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  val time = new MockTime()
+  val brokerTopicStats = new BrokerTopicStats
+  var log: Log = _
+  var replica: Replica = _
+
+  @Before
+  def setup(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
+    val config = LogConfig(logProps)
+    log = Log(logDir,
+      config,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      brokerTopicStats = brokerTopicStats,
+      time = time)
+
+    replica = new Replica(brokerId = 0,
+      topicPartition = new TopicPartition("foo", 0),
+      time = time,
+      log = Some(log))
+  }
+
+  @After
+  def tearDown(): Unit = {
+    log.close()
+    brokerTopicStats.close()
+    Utils.delete(tmpDir)
+  }
+
+  @Test
+  def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
+    val initialHighWatermark = 25L
+    replica = new Replica(brokerId = 0,
+      topicPartition = new TopicPartition("foo", 0),
+      time = time,
+      initialHighWatermarkValue = initialHighWatermark,
+      log = Some(log))
+
+    assertEquals(initialHighWatermark, replica.highWatermark.messageOffset)
+
+    val expiredTimestamp = time.milliseconds() - 1000
+    for (i <- 0 until 100) {
+      val records = TestUtils.singletonRecords(value = s"test$i".getBytes, timestamp = expiredTimestamp)
+      log.appendAsLeader(records, leaderEpoch = 0)
+    }
+
+    val initialNumSegments = log.numberOfSegments
+    log.deleteOldSegments()
+    assertTrue(log.numberOfSegments < initialNumSegments)
+    assertTrue(replica.logStartOffset <= initialHighWatermark)
+  }
+
+  @Test
+  def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = {
+    val expiredTimestamp = time.milliseconds() - 1000
+    for (i <- 0 until 100) {
+      val records = TestUtils.singletonRecords(value = s"test$i".getBytes, timestamp = expiredTimestamp)
+      log.appendAsLeader(records, leaderEpoch = 0)
+    }
+
+    // ensure we have at least a few segments so the test case is not trivial
+    assertTrue(log.numberOfSegments > 5)
+    assertEquals(0L, replica.highWatermark.messageOffset)
+    assertEquals(0L, replica.logStartOffset)
+    assertEquals(100L, replica.logEndOffset.messageOffset)
+
+    for (hw <- 0 to 100) {
+      replica.highWatermark = new LogOffsetMetadata(hw)
+      assertEquals(hw, replica.highWatermark.messageOffset)
+      log.deleteOldSegments()
+      assertTrue(replica.logStartOffset <= hw)
+
+      // verify that all segments up to the high watermark have been deleted
+      
+      log.logSegments.headOption.foreach { segment =>
+        assertTrue(segment.baseOffset <= hw)
+        assertTrue(segment.baseOffset >= replica.logStartOffset)
+      }
+      log.logSegments.tail.foreach { segment =>
+        assertTrue(segment.baseOffset > hw)
+        assertTrue(segment.baseOffset >= replica.logStartOffset)
+      }
+    }
+
+    assertEquals(100L, log.logStartOffset)
+    assertEquals(1, log.numberOfSegments)
+    assertEquals(0, log.activeSegment.size)
+  }
+
+  @Test(expected = classOf[OffsetOutOfRangeException])
+  def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = {
+    for (i <- 0 until 100) {
+      val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
+      log.appendAsLeader(records, leaderEpoch = 0)
+    }
+
+    replica.highWatermark = new LogOffsetMetadata(25L)
+    replica.maybeIncrementLogStartOffset(26L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 8a119c2..5ecd78c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -101,6 +101,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
       val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
       val startSize = log.size
 
+      log.onHighWatermarkIncremented(log.logEndOffset)
+
       val firstDirty = log.activeSegment.baseOffset
       cleaner.startup()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8b7819f..87808a3 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -98,6 +98,7 @@ class LogManagerTest {
       offset = info.lastOffset
     }
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+    log.onHighWatermarkIncremented(log.logEndOffset)
 
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
 
@@ -146,6 +147,7 @@ class LogManagerTest {
       offset = info.firstOffset
     }
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     assertEquals("Check we have the expected number of segments.", numMessages * setSize
/ config.segmentSize, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce
size

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 008cd27..3c80bff 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -612,6 +612,7 @@ class LogTest {
     assertEquals(2, log.activeProducers.size)
 
     log.maybeIncrementLogStartOffset(1L)
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(1, log.logSegments.size)
@@ -667,6 +668,7 @@ class LogTest {
     assertEquals(3, log.logSegments.size)
     assertEquals(Set(pid1, pid2), log.activeProducers.keySet)
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals(2, log.logSegments.size)
@@ -1300,6 +1302,7 @@ class LogTest {
       assertEquals(currOffset, messagesToAppend)
 
       // time goes by; the log file is deleted
+      log.onHighWatermarkIncremented(currOffset)
       log.deleteOldSegments()
 
       assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset,
log.logEndOffset)
@@ -1812,6 +1815,7 @@ class LogTest {
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
     assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
@@ -1851,6 +1855,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // expire all segments
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     log.close()
 
@@ -2186,7 +2191,7 @@ class LogTest {
     topic + "-" + partition
 
   @Test
-  def testDeleteOldSegmentsMethod() {
+  def testDeleteOldSegments() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds
- 1000)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
@@ -2208,11 +2213,32 @@ class LogTest {
     log.leaderEpochCache.assign(0, 40)
     log.leaderEpochCache.assign(1, 90)
 
+    // segments are not eligible for deletion if no high watermark has been set
+    val numSegments = log.numberOfSegments
+    log.deleteOldSegments()
+    assertEquals(numSegments, log.numberOfSegments)
+    assertEquals(0L, log.logStartOffset)
+
+    // only segments with offset before the current high watermark are eligible for deletion
+    for (hw <- 25 to 30) {
+      log.onHighWatermarkIncremented(hw)
+      log.deleteOldSegments()
+      assertTrue(log.logStartOffset <= hw)
+      log.logSegments.foreach { segment =>
+        val segmentFetchInfo = segment.read(startOffset = segment.baseOffset, maxOffset =
None, maxSize = Int.MaxValue)
+        val segmentLastOffsetOpt = segmentFetchInfo.records.records.asScala.lastOption.map(_.offset)
+        segmentLastOffsetOpt.foreach { lastOffset =>
+          assertTrue(lastOffset >= hw)
+        }
+      }
+    }
+
     // expire all segments
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
     assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
-    assertEquals("Epoch entry should be the latest epoch and the leo.", new EpochEntry(1,
100), epochCache(log).epochEntries().head)
+    assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100),
epochCache(log).epochEntries().head)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2222,7 +2248,6 @@ class LogTest {
     assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
     assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments())
     assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
-
   }
 
   @Test
@@ -2234,6 +2259,7 @@ class LogTest {
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have 3 segments", 3, log.numberOfSegments)
     assertEquals(log.logStartOffset, 0)
+    log.onHighWatermarkIncremented(log.logEndOffset)
 
     log.maybeIncrementLogStartOffset(1)
     log.deleteOldSegments()
@@ -2264,7 +2290,8 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.deleteOldSegments
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
     assertEquals("should have 2 segments", 2,log.numberOfSegments)
   }
 
@@ -2277,7 +2304,8 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
-    log.deleteOldSegments
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
     assertEquals("should have 3 segments", 3,log.numberOfSegments)
   }
 
@@ -2290,6 +2318,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -2303,6 +2332,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments)
   }
@@ -2322,6 +2352,7 @@ class LogTest {
     log.logSegments.head.lastModified = time.milliseconds - 20000
 
     val segments = log.numberOfSegments
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 3 segments remaining", segments, log.numberOfSegments)
   }
@@ -2337,6 +2368,7 @@ class LogTest {
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments)
   }
@@ -2407,7 +2439,8 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment is removed
-    log.deleteOldSegments
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
 
     //The oldest epoch entry should have been removed
     assertEquals(ListBuffer(EpochEntry(1, 5), EpochEntry(2, 10)), cache.epochEntries)
@@ -2430,7 +2463,8 @@ class LogTest {
     cache.assign(2, 10)
 
     //When first segment removed (up to offset 5)
-    log.deleteOldSegments
+    log.onHighWatermarkIncremented(log.logEndOffset)
+    log.deleteOldSegments()
 
     //The the first entry should have gone from (0,0) => (0,5)
     assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
@@ -2881,6 +2915,7 @@ class LogTest {
     assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
 
     log.maybeIncrementLogStartOffset(8L)
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     assertEquals(1, log.logSegments.size)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index b6b40c2..f9ee9f1 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -67,12 +67,13 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(0L, fooPartition0Hw)
-      val partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
+      val tp0 = new TopicPartition(topic, 0)
+      val partition0 = replicaManager.getOrCreatePartition(tp0)
       // create leader and follower replicas
       val log0 = logManagers.head.createLog(new TopicPartition(topic, 0), LogConfig())
-      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time,
0, Some(log0))
+      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, tp0, time, 0, Some(log0))
       partition0.addReplicaIfNotExists(leaderReplicaPartition0)
-      val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time)
+      val followerReplicaPartition0 = new Replica(configs.last.brokerId, tp0, time)
       partition0.addReplicaIfNotExists(followerReplicaPartition0)
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -112,11 +113,12 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(0L, topic1Partition0Hw)
-      val topic1Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic1,
0))
+      val t1p0 = new TopicPartition(topic1, 0)
+      val topic1Partition0 = replicaManager.getOrCreatePartition(t1p0)
       // create leader log
-      val topic1Log0 = logManagers.head.createLog(new TopicPartition(topic1, 0), LogConfig())
+      val topic1Log0 = logManagers.head.createLog(t1p0, LogConfig())
       // create a local replica for topic1
-      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0,
time, 0, Some(topic1Log0))
+      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, t1p0, time,
0, Some(topic1Log0))
       topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
       replicaManager.checkpointHighWatermarks()
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
@@ -128,11 +130,12 @@ class HighwatermarkPersistenceTest {
       assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
       assertEquals(5L, topic1Partition0Hw)
       // add another partition and set highwatermark
-      val topic2Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic2,
0))
+      val t2p0 = new TopicPartition(topic2, 0)
+      val topic2Partition0 = replicaManager.getOrCreatePartition(t2p0)
       // create leader log
-      val topic2Log0 = logManagers.head.createLog(new TopicPartition(topic2, 0), LogConfig())
+      val topic2Log0 = logManagers.head.createLog(t2p0, LogConfig())
       // create a local replica for topic2
-      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0,
time, 0, Some(topic2Log0))
+      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, t2p0, time,
0, Some(topic2Log0))
       topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
       replicaManager.checkpointHighWatermarks()
       var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 5d221fe..6a2d1bd 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -189,8 +189,9 @@ class IsrExpirationTest {
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time,
config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
-    val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
-    val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
+    val tp = new TopicPartition(topic, partitionId)
+    val partition = replicaManager.getOrCreatePartition(tp)
+    val leaderReplica = new Replica(leaderId, tp, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
     allReplicas.foreach(r => partition.addReplicaIfNotExists(r))
@@ -216,13 +217,14 @@ class IsrExpirationTest {
     val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
+    EasyMock.expect(log.onHighWatermarkIncremented(0L))
     EasyMock.replay(log)
     log
   }
 
   private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica]
= {
     configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition, time)
+      new Replica(config.brokerId, partition.topicPartition, time)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 9383355..ed7f3bf 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -92,6 +92,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()),
leaderEpoch = 0)
     log.flush()
 
+    log.onHighWatermarkIncremented(log.logEndOffset)
     log.maybeIncrementLogStartOffset(3)
     log.deleteOldSegments()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 2ee08a2..a58d2ce 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -187,10 +187,10 @@ class ReplicaManagerQuotasTest {
     //create the two replicas
     for ((p, _) <- fetchInfo) {
       val partition = replicaManager.getOrCreatePartition(p)
-      val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
+      val leaderReplica = new Replica(configs.head.brokerId, p, time, 0, Some(log))
       leaderReplica.highWatermark = new LogOffsetMetadata(5)
       partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
-      val followerReplica = new Replica(configs.last.brokerId, partition, time, 0, Some(log))
+      val followerReplica = new Replica(configs.last.brokerId, p, time, 0, Some(log))
       val allReplicas = Set(leaderReplica, followerReplica)
       allReplicas.foreach(partition.addReplicaIfNotExists)
       if (bothReplicasInSync) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 72d7fc5..9183b9c 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -116,12 +116,12 @@ class SimpleFetchTest {
     val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
 
     // create the leader replica with the local log
-    val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
+    val leaderReplica = new Replica(configs.head.brokerId, partition.topicPartition, time,
0, Some(log))
     leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW)
     partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)
 
     // create the follower replica with defined log end offset
-    val followerReplica= new Replica(configs(1).brokerId, partition, time)
+    val followerReplica= new Replica(configs(1).brokerId, partition.topicPartition, time)
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
     followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY),
                                                           highWatermark = leo.messageOffset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 93505c2..53e2cd8 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -349,7 +349,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   }
 
   private def getLog(broker: KafkaServer, partition: Int): Log = {
-    broker.logManager.logsByTopicPartition.get(new TopicPartition(topic, partition)).get
+    broker.logManager.logsByTopicPartition(new TopicPartition(topic, partition))
   }
 
   private def bounce(follower: KafkaServer): Unit = {
@@ -370,8 +370,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
   private def awaitISR(tp: TopicPartition): Unit = {
     TestUtils.waitUntilTrue(() => {
-      leader.replicaManager.getReplicaOrException(tp).partition.inSyncReplicas.map(_.brokerId).size
== 2
-    }, "")
+      leader.replicaManager.getPartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2
+    }, "Timed out waiting for replicas to join ISR")
   }
 
   private def createProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2b939c6/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index d004641..b445fa6 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -55,7 +55,7 @@ class OffsetsForLeaderEpochTest {
       QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
       new MetadataCache(config.brokerId))
     val partition = replicaManager.getOrCreatePartition(tp)
-    val leaderReplica = new Replica(config.brokerId, partition, time, 0, Some(mockLog))
+    val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog))
     partition.addReplicaIfNotExists(leaderReplica)
     partition.leaderReplicaIdOpt = Some(config.brokerId)
 


Mime
View raw message