kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2558: ServerShutdownTest is failing intermittently
Date Tue, 22 Sep 2015 01:22:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0990b6ba6 -> 4833d8a8c


KAFKA-2558: ServerShutdownTest is failing intermittently

See jira for a description.

Author: fpj <fpj@apache.org>

Reviewers: Onur Karaman, Ismael Juma, Guozhang Wang

Closes #224 from fpj/KAFKA-2558


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4833d8a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4833d8a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4833d8a8

Branch: refs/heads/trunk
Commit: 4833d8a8c34b2fb86a5cf605ea5483d0b9eadc4f
Parents: 0990b6b
Author: Flavio Junqueira <fpj@apache.org>
Authored: Mon Sep 21 18:25:42 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Sep 21 18:25:42 2015 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 14 +++++++++----
 .../kafka/controller/KafkaController.scala      |  4 ++--
 .../main/scala/kafka/server/KafkaServer.scala   |  4 ++--
 .../kafka/server/ReplicaFetcherManager.scala    | 10 +++++++--
 .../scala/kafka/server/ReplicaManager.scala     |  5 +++--
 .../unit/kafka/server/ReplicaManagerTest.scala  |  4 ++--
 .../server/ServerGenerateBrokerIdTest.scala     | 22 ++++++++++----------
 .../unit/kafka/server/ServerShutdownTest.scala  | 12 ++++-------
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 ++--
 9 files changed, 44 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 03234dc..0d62c96 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -36,7 +36,7 @@ import kafka.common.{KafkaException, TopicAndPartition}
 import collection.Set
 import collection.JavaConverters._
 
-class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig,
time: Time, metrics: Metrics) extends Logging {
+class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig,
time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
@@ -109,7 +109,12 @@ class ControllerChannelManager(controllerContext: ControllerContext,
config: Kaf
         Selectable.USE_DEFAULT_BUFFER_SIZE
       )
     }
-    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker,
messageQueue, networkClient, brokerNode, config, time)
+    val threadName = threadNamePrefix match {
+      case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
+      case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name,config.brokerId,
broker.id)
+    }
+
+    val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker,
messageQueue, networkClient, brokerNode, config, time, threadName)
     requestThread.setDaemon(false)
     brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode,
broker, messageQueue, requestThread))
   }
@@ -141,8 +146,9 @@ class RequestSendThread(val controllerId: Int,
                         val networkClient: NetworkClient,
                         val brokerNode: Node,
                         val config: KafkaConfig,
-                        val time: Time)
-  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId,
toBroker.id)) {
+                        val time: Time,
+                        name: String)
+  extends ShutdownableThread(name = name) {
 
   private val lock = new Object()
   private val stateChangeLogger = KafkaController.stateChangeLogger

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 29448b1..54a31c6 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -153,7 +153,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState,
time: Time, metrics: Metrics) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState,
time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with
KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -816,7 +816,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
   }
 
   private def startChannelManager() {
-    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext,
config, time, metrics)
+    controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext,
config, time, metrics, threadNamePrefix)
     controllerContext.controllerChannelManager.startup()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f3f1fa6..5cc9c5d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -85,7 +85,7 @@ object KafkaServer {
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with
KafkaMetricsGroup {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String]
= None) extends Logging with KafkaMetricsGroup {
   private val startupComplete = new AtomicBoolean(false)
   private val isShuttingDown = new AtomicBoolean(false)
   private val isStartingUp = new AtomicBoolean(false)
@@ -183,7 +183,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime,
metrics)
+        kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime,
metrics, threadNamePrefix)
         kafkaController.startup()
 
         /* start kafka coordinator */

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 6e845e9..779876b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -21,12 +21,18 @@ import kafka.cluster.BrokerEndPoint
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 
-class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics:
Metrics, time: Time)
+class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics:
Metrics, time: Time, threadNamePrefix: Option[String] = None)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
                                        "Replica", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
= {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id),
sourceBroker, brokerConfig,
+    val threadName = threadNamePrefix match {
+      case None =>
+        "ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
+      case Some(p) =>
+        "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
+    }
+    new ReplicaFetcherThread(threadName, sourceBroker, brokerConfig,
       replicaMgr, metrics, time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3e287ea..c0fec67 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -102,13 +102,14 @@ class ReplicaManager(val config: KafkaConfig,
                      val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
-                     val isShuttingDown: AtomicBoolean) extends Logging with KafkaMetricsGroup
{
+                     val isShuttingDown: AtomicBoolean,
+                     threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup
{
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   private val localBrokerId = config.brokerId
   private val allPartitions = new Pool[(String, Int), Partition]
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime)
+  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f260a71..301e268 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -83,7 +83,7 @@ class ReplicaManagerTest {
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
     val rm = new ReplicaManager(config, new Metrics, time, jTime, zkClient, new MockScheduler(time),
mockLogMgr,
-      new AtomicBoolean(false))
+      new AtomicBoolean(false), Option(this.getClass.getName))
     val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
     def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
       assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
@@ -93,7 +93,7 @@ class ReplicaManagerTest {
 
     rm.shutdown(false)
 
-    TestUtils.verifyNonDaemonThreadsStatus
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
 
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 1185a6f..9afb2ca 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -42,7 +42,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
 
   @Test
   def testAutoGenerateBrokerId() {
-    var server1 = new KafkaServer(config1)
+    var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     server1.startup()
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
@@ -52,14 +52,14 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     assertEquals(server1.config.brokerId, 1001)
     server1.shutdown()
     CoreUtils.rm(server1.config.logDirs)
-    TestUtils.verifyNonDaemonThreadsStatus
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   @Test
   def testUserConfigAndGeneratedBrokerId() {
     // start the server with broker.id as part of config
-    val server1 = new KafkaServer(config1)
-    val server2 = new KafkaServer(config2)
+    val server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
+    val server2 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName))
     val props3 = TestUtils.createBrokerConfig(-1, zkConnect)
     val config3 = KafkaConfig.fromProps(props3)
     val server3 = new KafkaServer(config3)
@@ -78,7 +78,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     CoreUtils.rm(server1.config.logDirs)
     CoreUtils.rm(server2.config.logDirs)
     CoreUtils.rm(server3.config.logDirs)
-    TestUtils.verifyNonDaemonThreadsStatus
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   @Test
@@ -88,7 +88,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     "," + TestUtils.tempDir().getAbsolutePath
     props1.setProperty("log.dir",logDirs)
     config1 = KafkaConfig.fromProps(props1)
-    var server1 = new KafkaServer(config1)
+    var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     server1.startup()
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
@@ -96,21 +96,21 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
     props1.setProperty("log.dir",newLogDirs)
     config1 = KafkaConfig.fromProps(props1)
-    server1 = new KafkaServer(config1)
+    server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
     server1.startup()
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
     CoreUtils.rm(server1.config.logDirs)
-    TestUtils.verifyNonDaemonThreadsStatus
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   @Test
   def testConsistentBrokerIdFromUserConfigAndMetaProps() {
     // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
-    var server1 = new KafkaServer(config1) //auto generate broker Id
+    var server1 = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
//auto generate broker Id
     server1.startup()
     server1.shutdown()
-    server1 = new KafkaServer(config2) // user specified broker id
+    server1 = new KafkaServer(config2, threadNamePrefix = Option(this.getClass.getName))
// user specified broker id
     try {
       server1.startup()
     } catch {
@@ -118,7 +118,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     }
     server1.shutdown()
     CoreUtils.rm(server1.config.logDirs)
-    TestUtils.verifyNonDaemonThreadsStatus
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
   def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 102dba9..b6d5697 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -46,7 +46,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdown() {
-    var server = new KafkaServer(config)
+    var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
     server.startup()
     var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)),
       encoder = classOf[StringEncoder].getName,
@@ -109,7 +109,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("delete.topic.enable", "true")
     val newConfig = KafkaConfig.fromProps(newProps)
-    val server = new KafkaServer(newConfig)
+    val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
     server.startup()
     server.shutdown()
     server.awaitShutdown()
@@ -122,7 +122,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val newProps = TestUtils.createBrokerConfig(0, zkConnect)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = KafkaConfig.fromProps(newProps)
-    val server = new KafkaServer(newConfig)
+    val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
     try {
       server.startup()
       fail("Expected KafkaServer setup to fail, throw exception")
@@ -146,11 +146,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
   }
 
   private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
-    val threadName = Option(t.getClass.getCanonicalName)
-      .getOrElse(t.getClass.getName())
-      .toLowerCase
-
-    !t.isDaemon && t.isAlive && threadName.startsWith("kafka")
+    !t.isDaemon && t.isAlive && t.getName.startsWith(this.getClass.getName)
   }
 
   def verifyNonDaemonThreadsStatus() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4833d8a8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 09b8444..b01adc8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -756,10 +756,10 @@ object TestUtils extends Logging {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }
 
-  def verifyNonDaemonThreadsStatus() {
+  def verifyNonDaemonThreadsStatus(threadNamePrefix: String) {
     assertEquals(0, Thread.getAllStackTraces.keySet().toArray
       .map(_.asInstanceOf[Thread])
-      .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
+      .count(t => !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)))
   }
 
   /**


Mime
View raw message