kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davidart...@apache.org
Subject [kafka] branch 2.7 updated: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics (#9677)
Date Thu, 03 Dec 2020 21:48:01 GMT
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 24867c3  KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics (#9677)
24867c3 is described below

commit 24867c34b4ed180a5f8f1ba531b2fd47d9bef772
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Thu Dec 3 16:11:07 2020 -0500

    KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics (#9677)
    
    Add small interface to Partition.scala that allows AlterIsr and ZK code paths to update
the ISR metrics managed by ReplicaManager. This opens the door for consolidating even more
code between the two ISR update code paths.
    
    Cherry-picked from trunk
---
 core/src/main/scala/kafka/api/ApiVersion.scala     |  2 +
 core/src/main/scala/kafka/cluster/Partition.scala  | 78 +++++++++++++++-------
 .../scala/kafka/controller/KafkaController.scala   |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 10 +--
 .../unit/kafka/cluster/AbstractPartitionTest.scala |  5 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  2 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 21 +++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 25 ++++++-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  4 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |  5 +-
 .../UpdateFollowerFetchStateBenchmark.java         |  5 +-
 11 files changed, 122 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 046d052..7cdbfe7 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -143,6 +143,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
   def recordVersion: RecordVersion
   def id: Int
 
+  def isAlterIsrSupported: Boolean = this >= KAFKA_2_7_IV2
+
   override def compare(that: ApiVersion): Int =
     ApiVersion.orderingByVersion.compare(this, that)
 
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f6a9e83..4e06198 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.{Optional, Properties}
 
-import kafka.api.{ApiVersion, KAFKA_2_7_IV2, LeaderAndIsr}
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
@@ -45,6 +45,12 @@ import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+trait IsrChangeListener {
+  def markExpand(): Unit
+  def markShrink(): Unit
+  def markFailed(): Unit
+}
+
 trait PartitionStateStore {
   def fetchTopicConfig(): Properties
   def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
@@ -52,8 +58,7 @@ trait PartitionStateStore {
 }
 
 class ZkPartitionStateStore(topicPartition: TopicPartition,
-                            zkClient: KafkaZkClient,
-                            replicaManager: ReplicaManager) extends PartitionStateStore {
+                            zkClient: KafkaZkClient) extends PartitionStateStore {
 
   override def fetchTopicConfig(): Properties = {
     val adminZkClient = new AdminZkClient(zkClient)
@@ -62,15 +67,11 @@ class ZkPartitionStateStore(topicPartition: TopicPartition,
 
   override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] =
{
     val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
-    if (newVersionOpt.isDefined)
-      replicaManager.isrShrinkRate.mark()
     newVersionOpt
   }
 
   override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] =
{
     val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
-    if (newVersionOpt.isDefined)
-      replicaManager.isrExpandRate.mark()
     newVersionOpt
   }
 
@@ -79,10 +80,8 @@ class ZkPartitionStateStore(topicPartition: TopicPartition,
       leaderAndIsr, controllerEpoch)
 
     if (updateSucceeded) {
-      replicaManager.recordIsrChange(topicPartition)
       Some(newVersion)
     } else {
-      replicaManager.failedIsrUpdatesRate.mark()
       None
     }
   }
@@ -107,10 +106,24 @@ object Partition extends KafkaMetricsGroup {
   def apply(topicPartition: TopicPartition,
             time: Time,
             replicaManager: ReplicaManager): Partition = {
+
+    val isrChangeListener = new IsrChangeListener {
+      override def markExpand(): Unit = {
+        replicaManager.recordIsrChange(topicPartition)
+        replicaManager.isrExpandRate.mark()
+      }
+
+      override def markShrink(): Unit = {
+        replicaManager.recordIsrChange(topicPartition)
+        replicaManager.isrShrinkRate.mark()
+      }
+
+      override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
+    }
+
     val zkIsrBackingStore = new ZkPartitionStateStore(
       topicPartition,
-      replicaManager.zkClient,
-      replicaManager)
+      replicaManager.zkClient)
 
     val delayedOperations = new DelayedOperations(
       topicPartition,
@@ -124,6 +137,7 @@ object Partition extends KafkaMetricsGroup {
       localBrokerId = replicaManager.config.brokerId,
       time = time,
       stateStore = zkIsrBackingStore,
+      isrChangeListener = isrChangeListener,
       delayedOperations = delayedOperations,
       metadataCache = replicaManager.metadataCache,
       logManager = replicaManager.logManager,
@@ -246,6 +260,7 @@ class Partition(val topicPartition: TopicPartition,
                 localBrokerId: Int,
                 time: Time,
                 stateStore: PartitionStateStore,
+                isrChangeListener: IsrChangeListener,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
                 logManager: LogManager,
@@ -270,7 +285,7 @@ class Partition(val topicPartition: TopicPartition,
   @volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty)
   @volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty)
 
-  private val useAlterIsr: Boolean = interBrokerProtocolVersion >= KAFKA_2_7_IV2
+  private val useAlterIsr: Boolean = interBrokerProtocolVersion.isAlterIsrSupported
 
   // Logs belonging to this partition. Majority of time it will be only one log, but if log
directory
   // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs
until copy
@@ -1323,6 +1338,9 @@ class Partition(val topicPartition: TopicPartition,
     info(s"Expanding ISR from ${isrState.isr.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}")
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newInSyncReplicaIds.toList,
zkVersion)
     val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
+    if (zkVersionOpt.isDefined) {
+      isrChangeListener.markExpand()
+    }
     maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
   }
 
@@ -1349,6 +1367,9 @@ class Partition(val topicPartition: TopicPartition,
   private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion)
     val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
+    if (zkVersionOpt.isDefined) {
+      isrChangeListener.markShrink()
+    }
     maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
   }
 
@@ -1361,6 +1382,7 @@ class Partition(val topicPartition: TopicPartition,
 
       case None =>
         info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating
ISR")
+        isrChangeListener.markFailed()
     }
   }
 
@@ -1376,6 +1398,7 @@ class Partition(val topicPartition: TopicPartition,
     val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState))
 
     if (!alterIsrManager.enqueue(alterIsrItem)) {
+      isrChangeListener.markFailed()
       throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state "
+
         s"$newLeaderAndIsr for partition $topicPartition")
     }
@@ -1401,27 +1424,36 @@ class Partition(val topicPartition: TopicPartition,
       }
 
       result match {
-        case Left(error: Errors) => error match {
-          case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
-            debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't
know about this topic or partition. Giving up.")
-          case Errors.FENCED_LEADER_EPOCH =>
-            debug(s"Controller failed to update ISR to $proposedIsrState since we sent an
old leader epoch. Giving up.")
-          case Errors.INVALID_UPDATE_VERSION =>
-            debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk
version. Giving up.")
-          case _ =>
-            warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected
$error. Retrying.")
-            sendAlterIsrRequest(proposedIsrState)
-        }
+        case Left(error: Errors) =>
+          isrChangeListener.markFailed()
+          error match {
+            case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+              debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't
know about this topic or partition. Giving up.")
+            case Errors.FENCED_LEADER_EPOCH =>
+              debug(s"Controller failed to update ISR to $proposedIsrState since we sent
an old leader epoch. Giving up.")
+            case Errors.INVALID_UPDATE_VERSION =>
+              debug(s"Controller failed to update ISR to $proposedIsrState due to invalid
zk version. Giving up.")
+            case _ =>
+              warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected
$error. Retrying.")
+              sendAlterIsrRequest(proposedIsrState)
+          }
         case Right(leaderAndIsr: LeaderAndIsr) =>
           // Success from controller, still need to check a few things
           if (leaderAndIsr.leaderEpoch != leaderEpoch) {
             debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a stale
leader epoch $leaderEpoch.")
+            isrChangeListener.markFailed()
           } else if (leaderAndIsr.zkVersion <= zkVersion) {
             debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a newer
version $zkVersion.")
+            isrChangeListener.markFailed()
           } else {
             isrState = CommittedIsr(leaderAndIsr.isr.toSet)
             zkVersion = leaderAndIsr.zkVersion
             info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")} and version
updated to [$zkVersion]")
+            proposedIsrState match {
+              case PendingExpandIsr(_, _) => isrChangeListener.markExpand()
+              case PendingShrinkIsr(_, _) => isrChangeListener.markShrink()
+              case _ => // nothing to do, shouldn't get here
+            }
           }
       }
     }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index cc66060..c5db8ca 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -84,7 +84,7 @@ class KafkaController(val config: KafkaConfig,
   @volatile private var brokerInfo = initialBrokerInfo
   @volatile private var _brokerEpoch = initialBrokerEpoch
 
-  private val isAlterIsrEnabled = config.interBrokerProtocolVersion >= KAFKA_2_7_IV2
+  private val isAlterIsrEnabled = config.interBrokerProtocolVersion.isAlterIsrSupported
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext
= true, None)
   val controllerContext = new ControllerContext
   var controllerChannelManager = new ControllerChannelManager(controllerContext, config,
time, metrics,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b9487fe..06983d8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -293,9 +293,11 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def recordIsrChange(topicPartition: TopicPartition): Unit = {
-    isrChangeSet synchronized {
-      isrChangeSet += topicPartition
-      lastIsrChangeMs.set(time.milliseconds())
+    if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
+      isrChangeSet synchronized {
+        isrChangeSet += topicPartition
+        lastIsrChangeMs.set(time.milliseconds())
+      }
     }
   }
   /**
@@ -340,7 +342,7 @@ class ReplicaManager(val config: KafkaConfig,
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before
it is removed from ISR
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs
/ 2, unit = TimeUnit.MILLISECONDS)
     // If using AlterIsr, we don't need the znode ISR propagation
-    if (config.interBrokerProtocolVersion < KAFKA_2_7_IV2) {
+    if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
       scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
         period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
     } else {
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 364e866..603598e 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiVersion
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.server.{Defaults, MetadataCache}
 import kafka.server.checkpoints.OffsetCheckpoints
-import kafka.utils.TestUtils.MockAlterIsrManager
+import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
@@ -41,6 +41,7 @@ class AbstractPartitionTest {
   var logDir2: File = _
   var logManager: LogManager = _
   var alterIsrManager: MockAlterIsrManager = _
+  var isrChangeListener: MockIsrChangeListener = _
   var logConfig: LogConfig = _
   val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
   val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
@@ -63,12 +64,14 @@ class AbstractPartitionTest {
     logManager.startup()
 
     alterIsrManager = TestUtils.createAlterIsrManager()
+    isrChangeListener = TestUtils.createIsrChangeListener()
     partition = new Partition(topicPartition,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       logManager,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8e696fb..e5dbec7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -249,6 +249,7 @@ class PartitionLockTest extends Logging {
     val brokerId = 0
     val topicPartition = new TopicPartition("test-topic", 0)
     val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
+    val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
     val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@@ -261,6 +262,7 @@ class PartitionLockTest extends Logging {
       localBrokerId = brokerId,
       mockTime,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       logManager,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 9ff1f02..d7aa33b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -228,6 +228,7 @@ class PartitionTest extends AbstractPartitionTest {
       localBrokerId = brokerId,
       time,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       logManager,
@@ -1164,11 +1165,20 @@ class PartitionTest extends AbstractPartitionTest {
       leaderEndOffset = 6L)
 
     assertEquals(alterIsrManager.isrUpdates.size, 1)
-    assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr, List(brokerId, remoteBrokerId))
+    val isrItem = alterIsrManager.isrUpdates.dequeue()
+    assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
     assertEquals(Set(brokerId), partition.isrState.isr)
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
     assertEquals(10L, remoteReplica.logEndOffset)
     assertEquals(0L, remoteReplica.logStartOffset)
+
+    // Complete the ISR expansion
+    isrItem.callback.apply(Right(new LeaderAndIsr(brokerId, leaderEpoch, List(brokerId, remoteBrokerId),
2)))
+    assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
+
+    assertEquals(isrChangeListener.expands.get, 1)
+    assertEquals(isrChangeListener.shrinks.get, 0)
+    assertEquals(isrChangeListener.failures.get, 0)
   }
 
   @Test
@@ -1221,6 +1231,10 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
     assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
     assertEquals(alterIsrManager.isrUpdates.size, 0)
+
+    assertEquals(isrChangeListener.expands.get, 0)
+    assertEquals(isrChangeListener.shrinks.get, 0)
+    assertEquals(isrChangeListener.failures.get, 1)
   }
 
   @Test
@@ -1639,7 +1653,7 @@ class PartitionTest extends AbstractPartitionTest {
     val topicPartition = new TopicPartition("test", 1)
     val partition = new Partition(
       topicPartition, 1000, ApiVersion.latestVersion, 0,
-      new SystemTime(), mock(classOf[PartitionStateStore]), mock(classOf[DelayedOperations]),
+      new SystemTime(), mock(classOf[PartitionStateStore]), mock(classOf[IsrChangeListener]),
mock(classOf[DelayedOperations]),
       mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager]))
 
     val replicas = Seq(0, 1, 2, 3)
@@ -1682,6 +1696,7 @@ class PartitionTest extends AbstractPartitionTest {
       localBrokerId = brokerId,
       time,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       spyLogManager,
@@ -1717,6 +1732,7 @@ class PartitionTest extends AbstractPartitionTest {
       localBrokerId = brokerId,
       time,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       spyLogManager,
@@ -1753,6 +1769,7 @@ class PartitionTest extends AbstractPartitionTest {
       localBrokerId = brokerId,
       time,
       stateStore,
+      isrChangeListener,
       delayedOperations,
       metadataCache,
       spyLogManager,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index af6bbc4..8b3a583 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,12 +23,13 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Arrays, Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
 
 import javax.net.ssl.X509TrustManager
 import kafka.api._
-import kafka.cluster.{Broker, EndPoint}
+import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
 import kafka.log._
 import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
 import kafka.server._
@@ -1082,6 +1083,28 @@ object TestUtils extends Logging {
     new MockAlterIsrManager()
   }
 
+  class MockIsrChangeListener extends IsrChangeListener {
+    val expands: AtomicInteger = new AtomicInteger(0)
+    val shrinks: AtomicInteger = new AtomicInteger(0)
+    val failures: AtomicInteger = new AtomicInteger(0)
+
+    override def markExpand(): Unit = expands.incrementAndGet()
+
+    override def markShrink(): Unit = shrinks.incrementAndGet()
+
+    override def markFailed(): Unit = failures.incrementAndGet()
+
+    def reset(): Unit = {
+      expands.set(0)
+      shrinks.set(0)
+      failures.set(0)
+    }
+  }
+
+  def createIsrChangeListener(): MockIsrChangeListener = {
+    new MockIsrChangeListener()
+  }
+
   def produceMessages(servers: Seq[KafkaServer],
                       records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
                       acks: Int = -1): Unit = {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 22c3d2c..3e4a325 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.kafka.jmh.fetcher;
 import kafka.api.ApiVersion$;
 import kafka.cluster.BrokerEndPoint;
 import kafka.cluster.DelayedOperations;
+import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
 import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
@@ -153,11 +154,12 @@ public class ReplicaFetcherThreadBenchmark {
 
             PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
             Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
+            IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
             OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
             Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
             AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class);
             Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
-                    0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
+                    0, Time.SYSTEM, partitionStateStore, isrChangeListener, new DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
 
             partition.makeFollower(partitionState, offsetCheckpoints);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index b1b587c..4323a2c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -19,6 +19,7 @@ package org.apache.kafka.jmh.partition;
 
 import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
+import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
 import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
@@ -118,11 +119,11 @@ public class PartitionMakeFollowerBenchmark {
         PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
         Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
         Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
-
+        IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(tp, 100,
             ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-            partitionStateStore, delayedOperations,
+            partitionStateStore, isrChangeListener, delayedOperations,
             Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
         partition.createLogIfNotExists(true, false, offsetCheckpoints);
         executorService.submit((Runnable) () -> {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 2253b08..54e5f48 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -19,6 +19,7 @@ package org.apache.kafka.jmh.partition;
 
 import kafka.api.ApiVersion$;
 import kafka.cluster.DelayedOperations;
+import kafka.cluster.IsrChangeListener;
 import kafka.cluster.Partition;
 import kafka.cluster.PartitionStateStore;
 import kafka.log.CleanerConfig;
@@ -116,11 +117,11 @@ public class UpdateFollowerFetchStateBenchmark {
             .setIsNew(true);
         PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
         Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
-
+        IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(topicPartition, 100,
                 ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-                partitionStateStore, delayedOperations,
+                partitionStateStore, isrChangeListener, delayedOperations,
                 Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
         partition.makeLeader(partitionState, offsetCheckpoints);
     }


Mime
View raw message