kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2300: Error in controller log when broker tries to rejoin cluster
Date Wed, 12 Aug 2015 21:30:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk be633a713 -> a62d63007


KAFKA-2300: Error in controller log when broker tries to rejoin cluster

Author: flavio junqueira <fpj@apache.org>

Reviewers: Ismael Juma, Guozhang Wang

Closes #102 from fpj/2300 and squashes the following commits:

7bd2edb [flavio junqueira] KAFKA-2300: Removed unnecessary s" occurrences.
aa6ec90 [flavio junqueira] KAFKA-2300: Wrapped all occurences of sendRequestToBrokers with
try/catch and fixed string typo.
f1261b1 [flavio junqueira] Fixed some style issues.
9b6390a [flavio junqueira] Updated package name and removed unnecessary imports.
dbd1bf3 [flavio junqueira] KAFKA-2300: Error in controller log when broker tries to rejoin
cluster


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62d6300
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62d6300
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62d6300

Branch: refs/heads/trunk
Commit: a62d63007c89c0c6f7ad62fe4643f7adc7fbc661
Parents: be633a7
Author: flavio junqueira <fpj@apache.org>
Authored: Wed Aug 12 14:31:39 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 12 14:31:39 2015 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 102 +++++-----
 .../kafka/controller/KafkaController.scala      |  52 ++++--
 .../controller/ControllerFailoverTest.scala     | 187 +++++++++++++++++++
 3 files changed, 288 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62d6300/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 9f521fa..4396b6e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse
 import collection.Set
 
 class ControllerChannelManager (private val controllerContext: ControllerContext, config:
KafkaConfig) extends Logging {
-  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+  protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
@@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
     }
   }
 
-  private def startRequestSendThread(brokerId: Int) {
+  protected def startRequestSendThread(brokerId: Int) {
     val requestThread = brokerStateInfo(brokerId).requestSendThread
     if(requestThread.getState == Thread.State.NEW)
       requestThread.start()
@@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) {
-    leaderAndIsrRequestMap.foreach { m =>
-      val broker = m._1
-      val partitionStateInfos = m._2.toMap
-      val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
-      val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b
=> b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId,
controllerEpoch, correlationId, clientId)
-      for (p <- partitionStateInfos) {
-        val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
"become-leader" else "become-follower"
-        stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request
%s with correlationId %d to broker %d " +
-                                 "for partition [%s,%d]").format(controllerId, controllerEpoch,
typeOfRequest,
-                                                                 p._2.leaderIsrAndControllerEpoch,
correlationId, broker,
-                                                                 p._1._1, p._1._2))
+    try {
+      leaderAndIsrRequestMap.foreach { m =>
+        val broker = m._1
+        val partitionStateInfos = m._2.toMap
+        val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
+        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b
=> b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol))
+        val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId,
controllerEpoch, correlationId, clientId)
+        for (p <- partitionStateInfos) {
+          val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
"become-leader" else "become-follower"
+          stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request
%s with correlationId %d to broker %d " +
+                                   "for partition [%s,%d]").format(controllerId, controllerEpoch,
typeOfRequest,
+                                                                   p._2.leaderIsrAndControllerEpoch,
correlationId, broker,
+                                                                   p._1._1, p._1._2))
+        }
+        controller.sendRequest(broker, leaderAndIsrRequest, null)
       }
-      controller.sendRequest(broker, leaderAndIsrRequest, null)
-    }
-    leaderAndIsrRequestMap.clear()
-    updateMetadataRequestMap.foreach { m =>
-      val broker = m._1
-      val partitionStateInfos = m._2.toMap
-
-      val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083))
1 else 0
-      val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort,
controllerId = controllerId, controllerEpoch = controllerEpoch,
-        correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos,
aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
-      partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d
sending UpdateMetadata request %s with " +
-        "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch,
p._2.leaderIsrAndControllerEpoch,
-        correlationId, broker, p._1)))
-      controller.sendRequest(broker, updateMetadataRequest, null)
-    }
-    updateMetadataRequestMap.clear()
-    stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
-      val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
-      val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
-      debug("The stop replica request (delete = true) sent to broker %d is %s"
-        .format(broker, stopReplicaWithDelete.mkString(",")))
-      debug("The stop replica request (delete = false) sent to broker %d is %s"
-        .format(broker, stopReplicaWithoutDelete.mkString(",")))
-      replicaInfoList.foreach { r =>
-        val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
-          Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,
correlationId)
-        controller.sendRequest(broker, stopReplicaRequest, r.callback)
+      leaderAndIsrRequestMap.clear()
+      updateMetadataRequestMap.foreach { m =>
+        val broker = m._1
+        val partitionStateInfos = m._2.toMap
+
+        val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083))
1 else 0
+        val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort,
controllerId = controllerId, controllerEpoch = controllerEpoch,
+          correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos,
aliveBrokers = controllerContext.liveOrShuttingDownBrokers)
+        partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch
%d sending UpdateMetadata request %s with " +
+          "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch,
p._2.leaderIsrAndControllerEpoch,
+          correlationId, broker, p._1)))
+        controller.sendRequest(broker, updateMetadataRequest, null)
+      }
+      updateMetadataRequestMap.clear()
+      stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
+        val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
+        val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
+        debug("The stop replica request (delete = true) sent to broker %d is %s"
+          .format(broker, stopReplicaWithDelete.mkString(",")))
+        debug("The stop replica request (delete = false) sent to broker %d is %s"
+          .format(broker, stopReplicaWithoutDelete.mkString(",")))
+        replicaInfoList.foreach { r =>
+          val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
+            Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch,
correlationId)
+          controller.sendRequest(broker, stopReplicaRequest, r.callback)
+        }
+      }
+      stopReplicaRequestMap.clear()
+    } catch {
+      case e : Throwable => {
+        if(leaderAndIsrRequestMap.size > 0) {
+          error("Haven't been able to send leader and isr requests, current state of " +
+              s"the map is $leaderAndIsrRequestMap")
+        }
+        if(updateMetadataRequestMap.size > 0) {
+          error("Haven't been able to send metadata update requests, current state of " +
+              s"the map is $updateMetadataRequestMap")
+        }
+        if(stopReplicaRequestMap.size > 0) {
+          error("Haven't been able to send stop replica requests, current state of " +
+              s"the map is $stopReplicaRequestMap")
+        }
+        throw new IllegalStateException(e)
       }
     }
-    stopReplicaRequestMap.clear()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62d6300/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b19e57f..68536f5 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -263,11 +263,20 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
                 } else {
                   // Stop the replica first. The state change below initiates ZK changes
which should take some time
                   // before which the stop replica request should be completed (in most cases)
-                  brokerRequestBatch.newBatch()
-                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                    topicAndPartition.partition, deletePartition = false)
-                  brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
-
+                  try {
+                    brokerRequestBatch.newBatch()
+                    brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                      topicAndPartition.partition, deletePartition = false)
+                    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+                  } catch {
+                    case e : IllegalStateException => {
+                      // Resign if the controller is in an illegal state
+                      error("Forcing the controller to resign")
+                      controllerElector.resign()
+
+                      throw e
+                    }
+                  }
                   // If the broker is a follower, updates the isr in ZK and notifies the
current leader
                   replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                     topicAndPartition.partition, id)), OfflineReplica)
@@ -341,6 +350,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
    * required to clean up internal controller data structures
    */
   def onControllerResignation() {
+    debug("Controller resigning, broker id %d".format(config.brokerId))
     // de-register listeners
     deregisterIsrChangeNotificationListener()
     deregisterReassignedPartitionsListener()
@@ -888,9 +898,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
     brokerRequestBatch.newBatch()
     updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
-        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
-          topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
-        brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
+        try {
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
+            topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+          brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
+        } catch {
+          case e : IllegalStateException => {
+            // Resign if the controller is in an illegal state
+            error("Forcing the controller to resign")
+            controllerElector.resign()
+
+            throw e
+          }
+        }
         stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with
new assigned replica list %s " +
           "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
updatedLeaderIsrAndControllerEpoch,
           newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader,
topicAndPartition))
@@ -998,9 +1018,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
    * @param brokers The brokers that the update metadata request should be sent to
    */
   def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition])
{
-    brokerRequestBatch.newBatch()
-    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
-    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+    try {
+      brokerRequestBatch.newBatch()
+      brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
+      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+    } catch {
+      case e : IllegalStateException => {
+        // Resign if the controller is in an illegal state
+        error("Forcing the controller to resign")
+        controllerElector.resign()
+
+        throw e
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62d6300/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
new file mode 100644
index 0000000..206a7c3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.Properties
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnit3Suite
+
+import org.junit.{Test, After, Before}
+import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import org.apache.log4j.{Logger, Level}
+
+import kafka.api.RequestOrResponse
+import kafka.common.TopicAndPartition
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.BrokerState
+import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.server.RunningAsController
+import kafka.utils._
+import kafka.utils.TestUtils._
+
+import scala.collection.Map
+import scala.collection.mutable
+
+
+class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
+  val log = Logger.getLogger(classOf[ControllerFailoverTest])
+  val numNodes = 2
+  val numParts = 1
+  val msgQueueSize = 1
+  val topic = "topic1"
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
+  override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
+    .map(KafkaConfig.fromProps(_, overridingProps))
+
+  override def setUp() {
+    super.setUp()
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  /**
+   * See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
+   * for the background of this test case
+   */
+  def testMetadataUpdate() {
+    log.setLevel(Level.INFO)
+    var controller: KafkaServer = this.servers.head;
+    // Find the current controller
+    val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
+    for (server <- this.servers) {
+      epochMap += (server.config.brokerId -> server.kafkaController.epoch)
+      if(server.kafkaController.isActive()) {
+        controller = server
+      }
+    }
+    // Create topic with one partition
+    kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1)
+    val topicPartition = TopicAndPartition("topic1", 0)
+    var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
+    while (!partitions.contains(topicPartition)) {
+      partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
+      Thread.sleep(100)
+    }
+    // Replace channel manager with our mock manager
+    controller.kafkaController.controllerContext.controllerChannelManager.shutdown()
+    val channelManager = new MockChannelManager(controller.kafkaController.controllerContext,

+                                                  controller.kafkaController.config)
+    channelManager.startup()
+    controller.kafkaController.controllerContext.controllerChannelManager = channelManager
+    channelManager.shrinkBlockingQueue(0)
+    channelManager.stopSendThread(0)
+    // Spawn a new thread to block on the outgoing channel
+    // queue
+    val thread = new Thread(new Runnable {
+      def run() {
+        try {
+          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
+          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
+          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
+          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
+        } catch {
+          case e : Exception => {
+            log.info("Thread interrupted")
+          }
+        }
+      }
+    })
+    thread.setName("mythread")
+    thread.start()
+    while (thread.getState() != Thread.State.WAITING) {
+      Thread.sleep(100)
+    }
+    // Assume that the thread is WAITING because it is
+    // blocked on the queue, so interrupt and move forward
+    thread.interrupt()
+    thread.join()
+    channelManager.resumeSendThread(0)
+    // Wait and find current controller
+    var found = false
+    var counter = 0
+    while (!found && counter < 10) {
+      for (server <- this.servers) {
+        val previousEpoch = (epochMap get server.config.brokerId) match {
+          case Some(epoch) =>
+            epoch
+          case None =>
+            val msg = String.format("Missing element in epoch map %s", epochMap.mkString(",
"))
+            throw new IllegalStateException(msg)
+        }
+
+        if (server.kafkaController.isActive
+            && (previousEpoch) < server.kafkaController.epoch) {
+          controller = server
+          found = true
+        }
+      }
+      if (!found) {
+          Thread.sleep(100)
+          counter += 1
+      }
+    }
+    // Give it a shot to make sure that sending isn't blocking
+    try {
+      controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
+    } catch {
+      case e : Throwable => {
+        fail(e)
+      }
+    }
+  }
+}
+
+class MockChannelManager(private val controllerContext: ControllerContext,
+                           config: KafkaConfig)
+                           extends ControllerChannelManager(controllerContext, config) {
+  def stopSendThread(brokerId: Int) {
+    val requestThread = brokerStateInfo(brokerId).requestSendThread
+    requestThread.isRunning.set(false)
+    requestThread.interrupt
+    requestThread.join
+  }
+
+  def shrinkBlockingQueue(brokerId: Int) {
+    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, RequestOrResponse =>
Unit)](1)
+    val brokerInfo = this.brokerStateInfo(brokerId)
+    this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel,
+                                                                      brokerInfo.broker,
+                                                                      messageQueue,
+                                                                      brokerInfo.requestSendThread))
+  }
+
+  def resumeSendThread (brokerId: Int) {
+    this.startRequestSendThread(0)
+  }
+
+  def queueCapacity(brokerId: Int): Int = {
+    this.brokerStateInfo(brokerId).messageQueue.remainingCapacity
+  }
+
+  def queueSize(brokerId: Int): Int = {
+    this.brokerStateInfo(brokerId).messageQueue.size
+  }
+}
\ No newline at end of file


Mime
View raw message