kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1298 Controlled shutdown tool doesn't seem to work out of the box; reviewed by Neha Narkhede
Date Wed, 21 May 2014 18:15:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bf83131df -> d0df33df3


KAFKA-1298 Controlled shutdown tool doesn't seem to work out of the box; reviewed by Neha
Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0df33df
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0df33df
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0df33df

Branch: refs/heads/trunk
Commit: d0df33df3df314ccd42f700a592e8efc31e80f87
Parents: bf83131
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Wed May 21 11:14:12 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed May 21 11:14:35 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ShutdownBroker.scala | 124 -------------------
 .../kafka/controller/KafkaController.scala      |  51 ++++----
 .../main/scala/kafka/server/KafkaConfig.scala   |  64 +++++-----
 3 files changed, 53 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0df33df/core/src/main/scala/kafka/admin/ShutdownBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala
deleted file mode 100644
index 2dd47e7..0000000
--- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
-import javax.management.ObjectName
-import kafka.controller.KafkaController
-import scala.Some
-import kafka.common.{TopicAndPartition, BrokerNotAvailableException}
-
-
-object ShutdownBroker extends Logging {
-
-  private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer)
-
-  private def invokeShutdown(params: ShutdownParams): Boolean = {
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
-      val controllerBrokerId = ZkUtils.getController(zkClient)
-      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + controllerBrokerId)._1
match {
-        case Some(controllerInfo) =>
-          var controllerHost: String = null
-          var controllerJmxPort: Int = -1
-          try {
-            Json.parseFull(controllerInfo) match {
-              case Some(m) =>
-                val brokerInfo = m.asInstanceOf[Map[String, Any]]
-                controllerHost = brokerInfo.get("host").get.toString
-                controllerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int]
-              case None =>
-                throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
-            }
-          }
-          val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(controllerHost,
controllerJmxPort))
-          info("Connecting to jmx url " + jmxUrl)
-          val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
-          val mbsc = jmxc.getMBeanServerConnection
-          val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName),
-                                                      "shutdownBroker",
-                                                      Array(params.brokerId),
-                                                      Array(classOf[Int].getName)).asInstanceOf[Set[TopicAndPartition]]
-          val shutdownComplete = (leaderPartitionsRemaining.size == 0)
-          info("Shutdown status: " +
-            (if (shutdownComplete) "complete" else "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining)))
-          shutdownComplete
-        case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not exist".format(controllerBrokerId))
-      }
-    } catch {
-      case t: Throwable =>
-        error("Operation failed due to controller failure", t)
-        false
-    } finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-
-  def main(args: Array[String]) {
-    val parser = new OptionParser
-    val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to shutdown.")
-            .withRequiredArg
-            .describedAs("Broker Id")
-            .ofType(classOf[java.lang.Integer])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
-            "Multiple URLS can be given to allow fail-over.")
-            .withRequiredArg
-            .describedAs("urls")
-            .ofType(classOf[String])
-    val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown
does not complete.")
-            .withRequiredArg
-            .describedAs("number of retries")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(0)
-    val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries
requested.")
-            .withRequiredArg
-            .describedAs("retry interval in ms (> 1000)")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(1000)
-
-    val options = parser.parse(args : _*)
-    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
-
-    val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
-    val numRetries = options.valueOf(numRetriesOpt).intValue
-
-    val shutdownParams = ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt))
-
-    if (!invokeShutdown(shutdownParams)) {
-      (1 to numRetries).takeWhile(attempt => {
-        info("Retry " + attempt)
-        try {
-          Thread.sleep(retryIntervalMs)
-        }
-        catch {
-          case ie: InterruptedException => // ignore
-        }
-        !invokeShutdown(shutdownParams)
-      })
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0df33df/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2fa1341..e776423 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -121,12 +121,8 @@ class ControllerContext(val zkClient: ZkClient,
   }
 }
 
-trait KafkaControllerMBean {
-  def shutdownBroker(id: Int): Set[TopicAndPartition]
-}
 
 object KafkaController extends Logging {
-  val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
   val stateChangeLogger = new StateChangeLogger("state.change.logger")
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
@@ -155,7 +151,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState)
extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState)
extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -238,7 +234,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
           throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
 
         controllerContext.shuttingDownBrokerIds.add(id)
-
         debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
         debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
       }
@@ -251,31 +246,29 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
 
       allPartitionsAndReplicationFactorOnBroker.foreach {
         case(topicAndPartition, replicationFactor) =>
-        // Move leadership serially to relinquish lock.
-        inLock(controllerContext.controllerLock) {
-          controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch
=>
-            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-              // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
-              // notifies all affected brokers
-              partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                controlledShutdownPartitionLeaderSelector)
-            }
-            else {
-              // Stop the replica first. The state change below initiates ZK changes which
should take some time
-              // before which the stop replica request should be completed (in most cases)
-              brokerRequestBatch.newBatch()
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                topicAndPartition.partition, deletePartition = false)
-              brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
-
-              // If the broker is a follower, updates the isr in ZK and notifies the current
leader
-              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                topicAndPartition.partition, id)), OfflineReplica)
+          // Move leadership serially to relinquish lock.
+          inLock(controllerContext.controllerLock) {
+            controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch
=>
+              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id && replicationFactor
> 1) {
+                // If the broker leads the topic partition, transition the leader and update
isr. Updates zk and
+                // notifies all affected brokers
+                partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                  controlledShutdownPartitionLeaderSelector)
+              } else {
+                // Stop the replica first. The state change below initiates ZK changes which
should take some time
+                // before which the stop replica request should be completed (in most cases)
+                brokerRequestBatch.newBatch()
+                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                  topicAndPartition.partition, deletePartition = false)
+                brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
+
+                // If the broker is a follower, updates the isr in ZK and notifies the current
leader
+                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                  topicAndPartition.partition, id)), OfflineReplica)
+              }
             }
           }
-        }
       }
-
       def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
         trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
         controllerContext.partitionLeadershipInfo.filter {
@@ -315,7 +308,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
       partitionStateMachine.startup()
       // register the partition change listeners for all existing topics on failover
       controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
-      Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId,
epoch))
       brokerState.newState(RunningAsController)
       maybeTriggerPartitionReassignment()
@@ -346,7 +338,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
       if (config.autoLeaderRebalanceEnable)
         autoRebalanceScheduler.shutdown()
 
-      Utils.unregisterMBean(KafkaController.MBeanName)
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
       if(controllerContext.controllerChannelManager != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0df33df/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d0bbeb6..c7508d5 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -31,7 +31,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
     this(new VerifiableProperties(originalProps))
     props.verify()
   }
-  
+
   private def getLogRetentionTimeMillis(): Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
@@ -40,44 +40,44 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
     } else {
        millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
     }
-    
+
   }
 
   /*********** General Configuration ***********/
-  
+
   /* the broker id for this server */
   val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
   /* the maximum size of message that the server can receive */
   val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead,
(0, Int.MaxValue))
-  
+
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
 
   /* the number of io threads that the server uses for carrying out network requests */
   val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
-  
+
   /* the number of threads to use for various background processing tasks */
   val backgroundThreads = props.getIntInRange("background.threads", 10, (1, Int.MaxValue))
-  
+
   /* the number of queued requests allowed before blocking the network threads */
   val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
-  
+
   /*********** Socket Server Configuration ***********/
-  
+
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
   /* hostname of broker. If this is set, it will only bind to this address. If this is not
set,
    * it will bind to all interfaces */
   val hostName: String = props.getString("host.name", null)
-  
+
   /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may
    * need to be different from the interface to which the broker binds. If this is not set,
    * it will use the value for "host.name" if configured. Otherwise
    * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */
   val advertisedHostName: String = props.getString("advertised.host.name", hostName)
-    
+
   /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may
    * need to be different from the port to which the broker binds. If this is not set,
    * it will publish the same port that the broker binds to. */
@@ -85,22 +85,22 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
-  
+
   /* the SO_RCVBUFF buffer of the socket sever sockets */
   val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024)
-  
+
   /* the maximum number of bytes in a socket request */
   val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024,
(1, Int.MaxValue))
-  
+
   /*********** Log Configuration ***********/
 
   /* the default number of log partitions per topic */
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
-  
+
   /* the directories in which the log data is kept */
   val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir",
"/tmp/kafka-logs")))
   require(logDirs.size > 0)
-  
+
   /* the maximum size of a single log file */
   val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize,
Int.MaxValue))
 
@@ -115,42 +115,42 @@ class KafkaConfig private (val props: VerifiableProperties) extends
ZKConfig(pro
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for
deletion */
   val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000,
(1, Long.MaxValue))
-  
+
   /* the default cleanup policy for segments beyond the retention window, must be either
"delete" or "compact" */
   val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
-  
+
   /* the number of background threads to use for log cleaning */
   val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
-  
+
   /* the log cleaner will be throttled so that the sum of its read and write i/o will be
less than this value on average */
   val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second",
Double.MaxValue)
-  
+
   /* the total memory used for log deduplication across all cleaner threads */
   val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size",
500*1024*1024L, (0, Long.MaxValue))
   require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size
must be at least 1MB per cleaner thread.")
-  
+
   /* the total memory used for log cleaner I/O buffers across all cleaner threads */
   val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024,
(0, Int.MaxValue))
-  
+
   /* log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become.
A higher value
    * will allow more log to be cleaned at once but will lead to more hash collisions */
   val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor",
0.9d)
-  
+
   /* the amount of time to sleep when there are no logs to clean */
   val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L,
Long.MaxValue))
-  
+
   /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */
   val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)
-  
+
   /* should we enable log cleaning? */
   val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
-  
+
   /* how long are delete records retained? */
   val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 *
60 * 60 * 1000L)
-  
+
   /* the maximum size in bytes of the offset index */
   val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024,
(4, Int.MaxValue))
-  
+
   /* the interval with which we add an entry to the offset index */
   val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
@@ -165,7 +165,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed
to disk */
   val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)
-  
+
   /* the frequency with which we update the persistent record of the last flush which acts
as the log recovery point */
   val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms",
60000, (0, Int.MaxValue))
 
@@ -210,7 +210,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* number of fetcher threads used to replicate messages from a source broker.
    * Increasing this value can increase the degree of I/O parallelism in the follower broker.
*/
   val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
-  
+
   /* the frequency with which the high watermark is saved out to disk */
   val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms",
5000L)
 
@@ -245,10 +245,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends
ZKConfig(pro
   val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms",
5000)
 
   /* enable controlled shutdown of the server */
-  val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default =
false)
+  val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default =
true)
 
   /*********** Offset management configuration ***********/
-  
+
   /* the maximum size for a metadata entry associated with an offset commit */
   val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", OffsetManagerConfig.DefaultMaxMetadataSize)
 


Mime
View raw message