kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1374467 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/log/ test/scala/unit/kafka/server/
Date Fri, 17 Aug 2012 23:09:05 GMT
Author: junrao
Date: Fri Aug 17 23:09:05 2012
New Revision: 1374467

URL: http://svn.apache.org/viewvc?rev=1374467&view=rev
Log:
KafkaController NPE in SessionExpireListener; patched by Yang Ye; reviewed by Jun Rao, Neha
Narkhede; KAFKA-464

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Fri Aug 17 23:09:05 2012
@@ -120,7 +120,7 @@ private[kafka] class ZookeeperConsumerCo
   connectZk()
   createFetcher()
   if (config.autoCommit) {
-    scheduler.startUp
+    scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
       config.autoCommitIntervalMs, false)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Fri
Aug 17 23:09:05 2012
@@ -38,14 +38,14 @@ class RequestSendThread(val controllerId
         extends Thread("requestSendThread-" + toBrokerId) with Logging {
   this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId,
toBrokerId)
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutDownLatch = new CountDownLatch(1)
+  private val shutdownLatch = new CountDownLatch(1)
   private val lock = new Object()
 
-  def shutDown(): Unit = {
+  def shutdown(): Unit = {
     info("shutting down")
     isRunning.set(false)
     interrupt()
-    shutDownLatch.await()
+    shutdownLatch.await()
     info("shutted down completed")
   }
 
@@ -84,7 +84,7 @@ class RequestSendThread(val controllerId
       case e: InterruptedException => warn("intterrupted. Shutting down")
       case e1 => error("Error due to ", e1)
     }
-    shutDownLatch.countDown()
+    shutdownLatch.countDown()
   }
 }
 
@@ -107,7 +107,7 @@ class ControllerChannelManager(allBroker
     messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse)
=> Unit)](config.controllerMessageQueueSize))
   }
 
-  def startUp() = {
+  def startup() = {
     for((brokerId, broker) <- brokers){
       val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId),
messageChannels(brokerId))
       thread.setDaemon(false)
@@ -116,7 +116,7 @@ class ControllerChannelManager(allBroker
     }
   }
 
-  def shutDown() = {
+  def shutdown() = {
     lock synchronized {
       for((brokerId, broker) <- brokers){
         removeBroker(brokerId)
@@ -152,7 +152,7 @@ class ControllerChannelManager(allBroker
         messageChannels(brokerId).disconnect()
         messageChannels.remove(brokerId)
         messageQueues.remove(brokerId)
-        messageThreads(brokerId).shutDown()
+        messageThreads(brokerId).shutdown()
         messageThreads.remove(brokerId)
       }catch {
         case e => error("Error while removing broker by the controller", e)
@@ -163,7 +163,8 @@ class ControllerChannelManager(allBroker
 
 class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
   this.logIdent = "Controller " + config.brokerId + ", "
-  info("startup");
+  info("startup")
+  private var isRunning = true
   private val controllerLock = new Object
   private var controllerChannelManager: ControllerChannelManager = null
   private var allBrokers : Set[Broker] = null
@@ -189,7 +190,7 @@ class KafkaController(config : KafkaConf
       info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
       allLeaders = new mutable.HashMap[(String, Int), Int]
       controllerChannelManager = new ControllerChannelManager(allBrokers, config)
-      controllerChannelManager.startUp()
+      controllerChannelManager.startup()
       return true
     } catch {
       case e: ZkNodeExistsException =>
@@ -201,6 +202,10 @@ class KafkaController(config : KafkaConf
   }
 
   private def controllerRegisterOrFailover(){
+    if(!isRunning){
+      info("controller has already been shut down, don't need to compete for lead controller
any more")
+      return
+    }
     info("try to become controller")
     if(tryToBecomeController() == true){
       info("won the controller competition and work on leader and isr recovery")
@@ -209,12 +214,7 @@ class KafkaController(config : KafkaConf
       onBrokerChange()
 
       // If there are some partition with leader not initialized, init the leader for them
-      val partitionReplicaAssignment = allPartitionReplicaAssignment.clone()
-      for((topicPartition, replicas) <- partitionReplicaAssignment){
-        if (allLeaders.contains(topicPartition)){
-          partitionReplicaAssignment.remove(topicPartition)
-        }
-      }
+      val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
       debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(),
allLeaders))
       initLeaders(partitionReplicaAssignment)
     }
@@ -228,18 +228,20 @@ class KafkaController(config : KafkaConf
     controllerLock synchronized {
       registerSessionExpirationListener()
       registerControllerExistListener()
+      isRunning = true
       controllerRegisterOrFailover()
     }
   }
 
-  def shutDown() = {
+  def shutdown() = {
     controllerLock synchronized {
       if(controllerChannelManager != null){
         info("shut down")
-        controllerChannelManager.shutDown()
+        controllerChannelManager.shutdown()
         controllerChannelManager = null
         info("shutted down completely")
       }
+      isRunning = false
     }
   }
 
@@ -280,11 +282,13 @@ class KafkaController(config : KafkaConf
     @throws(classOf[Exception])
     def handleNewSession() {
       controllerLock synchronized {
-        info("session expires, clean up the state")
-        controllerChannelManager.shutDown()
-        controllerChannelManager = null
-        controllerRegisterOrFailover()
+        if(controllerChannelManager != null){
+          info("session expires, clean up the state")
+          controllerChannelManager.shutdown()
+          controllerChannelManager = null
+        }
       }
+      controllerRegisterOrFailover()
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Aug
17 23:09:05 2012
@@ -65,7 +65,7 @@ class KafkaServer(val config: KafkaConfi
     }
 
     /* start scheduler */
-    kafkaScheduler.startUp
+    kafkaScheduler.startup
 
     /* start log manager */
     logManager = new LogManager(config,
@@ -132,7 +132,7 @@ class KafkaServer(val config: KafkaConfi
         logManager.shutdown()
 
       if(kafkaController != null)
-        kafkaController.shutDown()
+        kafkaController.shutdown()
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Fri
Aug 17 23:09:05 2012
@@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int
     }
   private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
 
-  def startUp = {
+  def startup = {
     executor = new ScheduledThreadPoolExecutor(numThreads)
     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri
Aug 17 23:09:05 2012
@@ -46,7 +46,7 @@ class LogManagerTest extends JUnit3Suite
                    override val logFileSize = 1024
                    override val flushInterval = 100
                  }
-    scheduler.startUp
+    scheduler.startup
     logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge,
false)
     logManager.startup
     logDir = logManager.logDir

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1374467&r1=1374466&r2=1374467&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Fri Aug 17 23:09:05 2012
@@ -37,7 +37,7 @@ class HighwatermarkPersistenceTest exten
     EasyMock.replay(zkClient)
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
-    scheduler.startUp
+    scheduler.startup
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler,
null)
     replicaManager.startup()
@@ -80,7 +80,7 @@ class HighwatermarkPersistenceTest exten
     EasyMock.replay(zkClient)
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
-    scheduler.startUp
+    scheduler.startup
     // create replica manager
     val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler,
null)
     replicaManager.startup()



Mime
View raw message