kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-5036; Second part: Points 2 -> 5): Refactor caching of Latest Epoch
Date Tue, 18 Apr 2017 00:24:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 17ce2a730 -> 020ca7903


http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f65884e..5dfcb63 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -94,7 +94,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val log = logManager.getLog(new TopicPartition(topic, part)).get
 
     for (_ <- 0 until 20)
-      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()),
leaderEpoch = 0)
     log.flush()
 
     log.maybeIncrementLogStartOffset(3)
@@ -128,7 +128,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val log = logManager.getLog(new TopicPartition(topic, part)).get
 
     for (_ <- 0 until 20)
-      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()),
leaderEpoch = 0)
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime,
15)
@@ -189,7 +189,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
 
     for (_ <- 0 until 20)
-      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()),
leaderEpoch = 0)
     log.flush()
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions
with the fs
@@ -217,7 +217,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val logManager = server.getLogManager
     val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
     for (_ <- 0 until 20)
-      log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()),
leaderEpoch = 0)
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime,
10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index d4118c1..b0e81a9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -116,7 +116,7 @@ class ReplicaFetcherThreadTest {
     //Stubs
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
 
@@ -174,7 +174,7 @@ class ReplicaFetcherThreadTest {
     expect(logManager.truncateTo(capture(truncateToCapture))).once
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes()
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
 
@@ -220,7 +220,7 @@ class ReplicaFetcherThreadTest {
     expect(logManager.truncateTo(capture(truncated))).once
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
     replay(leaderEpochs, replicaManager, logManager, quota, replica)
@@ -263,7 +263,7 @@ class ReplicaFetcherThreadTest {
     expect(logManager.truncateTo(capture(truncated))).anyTimes()
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
     replay(leaderEpochs, replicaManager, logManager, quota, replica)
@@ -312,7 +312,7 @@ class ReplicaFetcherThreadTest {
     //Stub return values
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
 
@@ -358,7 +358,7 @@ class ReplicaFetcherThreadTest {
     expect(logManager.truncateTo(capture(truncateToCapture))).once
     expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
     expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
-    expect(leaderEpochs.latestUsedEpoch).andReturn(5)
+    expect(leaderEpochs.latestEpoch).andReturn(5)
     expect(replicaManager.logManager).andReturn(logManager).anyTimes()
     stub(replica, replicaManager)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 1a24c34..afd1f35 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -48,28 +48,10 @@ class LeaderEpochFileCacheTest {
     leo = 11
 
     //Then
-    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(2, cache.latestEpoch())
     assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
     assertEquals(11, cache.endOffsetFor(2)) //should match leo
   }
-  
-  @Test
-  def shouldUpdateEpochWithLogEndOffset() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
-
-    //Given
-    leo = 9
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
-    //When
-    cache.cacheLatestEpoch(2)
-    cache.maybeAssignLatestCachedEpochToLeo()
-
-    //Then
-    assertEquals(2, cache.latestUsedEpoch())
-    assertEquals(EpochEntry(2, 9), cache.epochEntries()(0))
-  }
 
   @Test
   def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
@@ -113,38 +95,28 @@ class LeaderEpochFileCacheTest {
     leo = 9
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
-    cache.cacheLatestEpoch(2)
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(2, leo)
 
     //When called again later
-    leo = 10
-    cache.cacheLatestEpoch(2)
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(2, 10)
 
     //Then the offset should NOT have been updated
-    assertEquals(9, cache.epochEntries()(0).startOffset)
+    assertEquals(leo, cache.epochEntries()(0).startOffset)
   }
 
   @Test
   def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = {
-    var leo = 0
-    def leoFinder() = new LogOffsetMetadata(leo)
+    def leoFinder() = new LogOffsetMetadata(0)
 
     //Given
-    leo = 9
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
+    cache.assign(2, 9)
 
-    cache.cacheLatestEpoch(2)
-    cache.maybeAssignLatestCachedEpochToLeo()
-
-    //When update epoch with same leo
-    cache.cacheLatestEpoch(3)
-    cache.maybeAssignLatestCachedEpochToLeo()
+    //When update epoch new epoch but same offset
+    cache.assign(3, 9)
 
-    //Then the offset should NOT have been updated
-    assertEquals(9, cache.endOffsetFor(3))
-    assertEquals(9, cache.endOffsetFor(2))
-    assertEquals(3, cache.latestUsedEpoch())
+    //Then epoch should have been updated
+    assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries())
   }
   
   @Test
@@ -168,7 +140,6 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch())
     assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0))
   }
 
@@ -253,8 +224,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //When
-    cache.cacheLatestEpoch(epoch = 2)
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(epoch = 2, offset = 100)
 
     //Then
     assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3))
@@ -312,7 +282,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, offset = 7); leo = 8
 
     //Then epoch should not be changed
-    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(2, cache.latestEpoch())
 
     //Then end offset for epoch 1 shouldn't have changed
     assertEquals(6, cache.endOffsetFor(1))
@@ -347,18 +317,16 @@ class LeaderEpochFileCacheTest {
 
     //Given
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-    cache.cacheLatestEpoch(epoch = 0) //leo=0
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(epoch = 0, offset = 0) //leo=0
 
     //When
-    cache.cacheLatestEpoch(epoch = 1) //leo=0
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(epoch = 1, offset = 0) //leo=0
 
     //Then epoch should go up
-    assertEquals(1, cache.latestUsedEpoch())
+    assertEquals(1, cache.latestEpoch())
     //offset for 1 should still be 0
     assertEquals(0, cache.endOffsetFor(1))
-    //offset for 0 should the start offset of epoch(1) => 0
+    //offset for epoch 0 should still be 0
     assertEquals(0, cache.endOffsetFor(0))
 
     //When we write 5 messages as epoch 1
@@ -366,12 +334,12 @@ class LeaderEpochFileCacheTest {
 
     //Then end offset for epoch(1) should be leo => 5
     assertEquals(5, cache.endOffsetFor(1))
-    //Epoch(0) should still show the start offset for Epoch(1) => 0
+    //Epoch 0 should still be at offset 0
     assertEquals(0, cache.endOffsetFor(0))
 
     //When
-    cache.cacheLatestEpoch(epoch = 2) //leo=5
-    cache.maybeAssignLatestCachedEpochToLeo()
+    cache.assign(epoch = 2, offset = 5) //leo=5
+
     leo = 10 //write another 5 messages
 
     //Then end offset for epoch(2) should be leo => 10
@@ -398,7 +366,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 0, offset = 2); leo = 3
 
     //Then epoch should stay, offsets should grow
-    assertEquals(0, cache.latestUsedEpoch())
+    assertEquals(0, cache.latestEpoch())
     assertEquals(leo, cache.endOffsetFor(0))
 
     //When messages arrive with greater epoch
@@ -406,7 +374,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 1, offset = 4); leo = 5
     cache.assign(epoch = 1, offset = 5); leo = 6
 
-    assertEquals(1, cache.latestUsedEpoch())
+    assertEquals(1, cache.latestEpoch())
     assertEquals(leo, cache.endOffsetFor(1))
 
     //When
@@ -414,7 +382,7 @@ class LeaderEpochFileCacheTest {
     cache.assign(epoch = 2, offset = 7); leo = 8
     cache.assign(epoch = 2, offset = 8); leo = 9
 
-    assertEquals(2, cache.latestUsedEpoch())
+    assertEquals(2, cache.latestEpoch())
     assertEquals(leo, cache.endOffsetFor(2))
 
     //Older epochs should return the start offset of the first message in the subsequent
epoch.
@@ -589,7 +557,7 @@ class LeaderEpochFileCacheTest {
     cache.clearLatest(offset = 9)
 
     //Then should keep the preceding epochs
-    assertEquals(3, cache.latestUsedEpoch())
+    assertEquals(3, cache.latestEpoch())
     assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
   }
 
@@ -653,7 +621,7 @@ class LeaderEpochFileCacheTest {
     val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
 
     //Then
-    assertEquals(-1, cache.latestUsedEpoch)
+    assertEquals(-1, cache.latestEpoch)
   }
 
   @Test
@@ -692,28 +660,6 @@ class LeaderEpochFileCacheTest {
     cache.clearLatest(7)
   }
 
-  @Test
-  def shouldUpdateEpochCacheOnLeadershipChangeThenCommit(): Unit ={
-    //Given
-    def leoFinder() = new LogOffsetMetadata(5)
-    val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
-
-    //When
-    cache.cacheLatestEpoch(2)
-
-    //Then
-    assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch())
-
-    //When
-    cache.maybeAssignLatestCachedEpochToLeo()
-
-    //Then should have saved epoch
-    assertEquals(2, cache.latestUsedEpoch())
-
-    //Then should have applied LEO to epoch
-    assertEquals(5, cache.endOffsetFor(2))
-  }
-
   @Before
   def setUp() {
     checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())


Mime
View raw message