kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove redundant checkpoint thread started field in ReplicaManager (#6813)
Date Tue, 28 May 2019 19:56:39 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a3c92b  MINOR: Remove redundant checkpoint thread started field in ReplicaManager
(#6813)
4a3c92b is described below

commit 4a3c92bfc78044e1a325543e906ad38a2cfbb39c
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue May 28 12:56:22 2019 -0700

    MINOR: Remove redundant checkpoint thread started field in ReplicaManager (#6813)
    
    We have two fields `highWatermarkCheckPointThreadStarted` and `hwThreadInitialized` which
appear to be serving the same purpose. This patch gets rid of `hwThreadInitialized`.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 core/src/main/scala/kafka/server/ReplicaManager.scala | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 54d35ef..55663d3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -201,7 +201,6 @@ class ReplicaManager(val config: KafkaConfig,
   @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
     (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename),
logDirFailureChannel))).toMap
 
-  private var hwThreadInitialized = false
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext
= false, None)
 
@@ -265,8 +264,8 @@ class ReplicaManager(val config: KafkaConfig,
 
   def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
 
-  def startHighWaterMarksCheckPointThread() = {
-    if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
+  def startHighWatermarkCheckPointThread() = {
+    if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
       scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period =
config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
   }
 
@@ -1136,13 +1135,9 @@ class ReplicaManager(val config: KafkaConfig,
             markPartitionOffline(topicPartition)
         }
 
-
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
-        if (!hwThreadInitialized) {
-          startHighWaterMarksCheckPointThread()
-          hwThreadInitialized = true
-        }
+        startHighWatermarkCheckPointThread()
 
         val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
         for (partition <- newPartitions) {


Mime
View raw message