kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao
Date Tue, 06 May 2014 23:36:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 44c39c4ea -> 9b6bf4078


kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407

Branch: refs/heads/trunk
Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
Parents: 44c39c4
Author: Timothy Chen <tnachen@gmail.com>
Authored: Tue May 6 16:36:09 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 6 16:36:09 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |  7 ++-
 core/src/main/scala/kafka/log/LogManager.scala  |  6 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 26 ++++++--
 .../kafka/server/KafkaServerStartable.scala     |  8 +++
 .../scala/unit/kafka/log/LogManagerTest.scala   | 64 +++++++++++++++++---
 .../server/HighwatermarkPersistenceTest.scala   |  1 +
 .../unit/kafka/server/ReplicaManagerTest.scala  |  1 +
 7 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 401bf1e..2fa1341 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,6 @@ import kafka.cluster.Broker
 import kafka.common._
 import kafka.log.LogConfig
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import kafka.utils.Utils._
@@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
+import scala.None
+import kafka.server._
 import scala.Some
 import kafka.common.TopicAndPartition
 
@@ -154,7 +155,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with
KafkaMetricsGroup with KafkaControllerMBean {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState)
extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
       controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId,
epoch))
+      brokerState.newState(RunningAsController)
       maybeTriggerPartitionReassignment()
       maybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
@@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
       }
+      brokerState.newState(RunningAsBroker)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ab72cff..1946c94 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.OffsetCheckpoint
+import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible
for log creation, retrieval, and cleaning.
@@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
                  val flushCheckpointMs: Long,
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
+                 val brokerState: BrokerState,
                  private val time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
@@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
         val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
         if(cleanShutDownFile.exists())
           info("Found clean shutdown file. Skipping recovery for all logs in data directory
'%s'".format(dir.getAbsolutePath))
+        else
+          brokerState.newState(RecoveringFromUncleanShutdown)
+
         for(dir <- subDirs) {
           if(dir.isDirectory) {
             info("Loading log '" + dir.getName + "'")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c208f83..c22e51e 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,16 +31,19 @@ import kafka.cluster.Broker
 import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 import kafka.common.ErrorMapping
 import kafka.network.{Receive, BlockingChannel, SocketServer}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with
KafkaMetricsGroup {
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
   private var startupComplete = new AtomicBoolean(false)
+  val brokerState: BrokerState = new BrokerState
   val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
   var zkClient: ZkClient = null
 
+  newGauge(
+    "BrokerState",
+    new Gauge[Int] {
+      def value = brokerState.currentState
+    }
+  )
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
     info("starting")
+    brokerState.newState(Starting)
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
 
@@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
     zkClient = initZk()
 
     /* start log manager */
-    logManager = createLogManager(zkClient)
+    logManager = createLogManager(zkClient, brokerState)
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
@@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
     /* start offset manager */
     offsetManager = createOffsetManager()
 
-    kafkaController = new KafkaController(config, zkClient)
+    kafkaController = new KafkaController(config, zkClient, brokerState)
     
     /* start processing requests */
     apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient,
config.brokerId, config, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel,
apis, config.numIoThreads)
+    brokerState.newState(RunningAsBroker)
    
     Mx4jLoader.maybeLoad()
 
@@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
       var prevController : Broker = null
       var shutdownSuceeded : Boolean = false
       try {
+        brokerState.newState(PendingControlledShutdown)
         while (!shutdownSuceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
@@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
               // send the controlled shutdown request
               val request = new ControlledShutdownRequest(correlationId.getAndIncrement,
config.brokerId)
               channel.send(request)
+
               response = channel.receive()
+
               val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
               if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining
!= null &&
                   shutdownResponse.partitionsRemaining.size == 0) {
@@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
     val canShutdown = isShuttingDown.compareAndSet(false, true)
     if (canShutdown) {
       Utils.swallow(controlledShutdown())
+      brokerState.newState(BrokerShuttingDown)
       if(kafkaHealthcheck != null)
         Utils.swallow(kafkaHealthcheck.shutdown())
       if(socketServer != null)
@@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
       if(zkClient != null)
         Utils.swallow(zkClient.close())
 
+      brokerState.newState(NotRunning)
       shutdownLatch.countDown()
       startupComplete.set(false)
       info("shut down completed")
@@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
 
   def getLogManager(): LogManager = logManager
   
-  private def createLogManager(zkClient: ZkClient): LogManager = {
+  private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager
= {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
                                      segmentMs = 60L * 60L * 1000L * config.logRollHours,
                                      flushInterval = config.logFlushIntervalMessages,
@@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                    flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,
                    scheduler = kafkaScheduler,
+                   brokerState = brokerState,
                    time = time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index acda52b..cef3b84 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging
{
     }
   }
 
+  /**
+   * Allow setting broker state from the startable.
+   * This is needed when a custom kafka server startable want to emit new states that it
introduces.
+   */
+  def setServerState(newState: Byte) {
+    server.brokerState.newState(newState)
+  }
+
   def awaitShutdown() = 
     server.awaitShutdown
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index be1a1ee..d03d4c4 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,10 +21,9 @@ import java.io._
 import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerState, OffsetCheckpoint}
 import kafka.common._
 import kafka.utils._
-import kafka.server.OffsetCheckpoint
 
 class LogManagerTest extends JUnit3Suite {
 
@@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
                                 flushCheckpointMs = 100000L, 
                                 retentionCheckMs = 1000L, 
                                 scheduler = time.scheduler, 
-                                time = time)
+                                time = time,
+                                brokerState = new BrokerState())
     logManager.startup
     logDir = logManager.logDirs(0)
   }
@@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
     logManager.shutdown()
 
     val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L
* setSize + 10L)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L,
1000L, time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = config,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     logManager.startup
 
     // create a log
@@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
   def testTimeBasedFlush() {
     logManager.shutdown()
     val config = logConfig.copy(flushMs = 1000)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L,
1000L, time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = config,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
@@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
                      TestUtils.tempDir(), 
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L,
time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = dirs,
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L,
time.scheduler, time)
+      new LogManager(
+        logDirs = Array(logDir),
+        topicConfigs = Map(),
+        defaultConfig = logConfig,
+        cleanerConfig = cleanerConfig,
+        flushCheckMs = 1000L,
+        flushCheckpointMs = 10000L,
+        retentionCheckMs = 1000L,
+        scheduler = time.scheduler,
+        brokerState = new BrokerState(),
+        time = time
+      )
       fail("Should not be able to create a second log manager instance with the same data
directory")
     } catch {
       case e: KafkaException => // this is good 
@@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerState = new BrokerState())
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }
@@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerState = new BrokerState())
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index a78f7cf..558a5d6 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
                                                          flushCheckpointMs = 10000L,
                                                          retentionCheckMs = 30000,
                                                          scheduler = new KafkaScheduler(1),
+                                                         brokerState = new BrokerState(),
                                                          time = new MockTime))
     
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 41ebc7a..518d416 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
+      brokerState = new BrokerState(),
       time = time)
   }
 


Mime
View raw message