kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-928 new topics may not be processed after ZK session expiration in controller; reviewed by Jun Rao
Date Mon, 03 Jun 2013 21:06:46 GMT
Updated Branches:
  refs/heads/0.8 4850519a2 -> 658427638


KAFKA-928 new topics may not be processed after ZK session expiration in controller; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65842763
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65842763
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65842763

Branch: refs/heads/0.8
Commit: 658427638f1b00ae1d284e579b4631f7119e54cc
Parents: 4850519
Author: Neha Narkhede <nehanarkhede@apache.org>
Authored: Mon Jun 3 14:06:08 2013 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Mon Jun 3 14:06:08 2013 -0700

----------------------------------------------------------------------
 .../kafka/controller/PartitionStateMachine.scala   |    5 ++---
 .../kafka/controller/ReplicaStateMachine.scala     |    5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0f5ebde..deebed0 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -46,7 +46,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext,
controller.sendRequest,
     controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
-  private val hasShutdown = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -74,7 +73,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    hasShutdown.compareAndSet(false, true)
+    hasStarted.set(false)
     partitionState.clear()
   }
 
@@ -358,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
       controllerContext.controllerLock synchronized {
-        if (!hasShutdown.get) {
+        if (hasStarted.get) {
           try {
             debug("Topic change listener fired for path %s with children %s".format(parentPath,
children.mkString(",")))
             val currentChildren = JavaConversions.asBuffer(children).toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5283fcd..4fbb28e 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -45,7 +45,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext,
controller.sendRequest,
     controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
-  private val hasShutdown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId +
"]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -73,7 +72,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    hasShutdown.compareAndSet(false, true)
+    hasStarted.set(false)
     replicaState.clear()
   }
 
@@ -252,7 +251,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
       ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children %s".format(parentPath,
currentBrokerList.mkString(",")))
         controllerContext.controllerLock synchronized {
-          if (!hasShutdown.get) {
+          if (hasStarted.get) {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds


Mime
View raw message