kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5473; handle ZK session expiration properly when a new session can't be established
Date Fri, 15 Dec 2017 22:48:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 529786638 -> e5daa40e3


KAFKA-5473; handle ZK session expiration properly when a new session can't be established

(WIP: this commit isn't ready to be reviewed yet. I was checking the travis-ci build with
the configuration changes in my account and opened the PR prematurely against trunk. I will
make it consistent with Contribution guidelines once it's well tested.)

https://issues.apache.org/jira/browse/KAFKA-5473

Design:
`zookeeper.connection.retry.timeout.ms` => this determines how long to wait before triggering
the shutdown. The default is 60000ms.

Currently the implementation only handles the `handleSessionEstablishmentError` by waiting
for the sessionTimeout.

Author: Prasanna Gautam <prasannagautam@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #3990 from prasincs/KAFKA-5473


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5daa40e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5daa40e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5daa40e

Branch: refs/heads/trunk
Commit: e5daa40e316261e8e6cb8866ad9a4eedcf17f919
Parents: 5297866
Author: Prasanna Gautam <prasannagautam@gmail.com>
Authored: Fri Dec 15 14:48:30 2017 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Dec 15 14:48:30 2017 -0800

----------------------------------------------------------------------
 bin/kafka-server-stop.sh                        | 10 +--
 bin/zookeeper-server-stop.sh                    |  8 +-
 .../kafka/controller/KafkaController.scala      | 19 ++++-
 .../scala/kafka/server/KafkaHealthcheck.scala   | 89 ++++++++------------
 .../main/scala/kafka/server/KafkaServer.scala   | 59 +++++++++----
 .../src/main/scala/kafka/zk/KafkaZkClient.scala |  8 ++
 core/src/main/scala/kafka/zk/ZkData.scala       | 21 +++++
 .../server/ServerGenerateBrokerIdTest.scala     |  4 +-
 .../unit/kafka/server/ServerStartupTest.scala   |  8 +-
 9 files changed, 140 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/bin/kafka-server-stop.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-server-stop.sh b/bin/kafka-server-stop.sh
index d3c660c..de26379 100755
--- a/bin/kafka-server-stop.sh
+++ b/bin/kafka-server-stop.sh
@@ -5,20 +5,20 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+SIGNAL=${SIGNAL:-TERM}
 PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
 
 if [ -z "$PIDS" ]; then
   echo "No kafka server to stop"
   exit 1
-else 
-  kill -s TERM $PIDS
+else
+  kill -s $SIGNAL $PIDS
 fi
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/bin/zookeeper-server-stop.sh
----------------------------------------------------------------------
diff --git a/bin/zookeeper-server-stop.sh b/bin/zookeeper-server-stop.sh
index f771064..dcaa325 100755
--- a/bin/zookeeper-server-stop.sh
+++ b/bin/zookeeper-server-stop.sh
@@ -5,20 +5,20 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+SIGNAL=${SIGNAL:-TERM}
 PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
 
 if [ -z "$PIDS" ]; then
   echo "No zookeeper server to stop"
   exit 1
 else
-  kill -s TERM $PIDS
+  kill -s $SIGNAL $PIDS
 fi
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b626ade..f272851 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -54,7 +54,7 @@ object KafkaController extends Logging {
 
 }
 
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics:
Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics:
Metrics, brokerInfo: BrokerInfo, threadNamePrefix: Option[String] = None) extends Logging
with KafkaMetricsGroup {
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext
= true, None)
@@ -148,7 +148,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       override val name: String = StateChangeHandlers.ControllerHandler
       override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
       override def afterInitializingSession(): Unit = {
-        eventManager.put(Reelect)
+        eventManager.put(RegisterBrokerAndReelect)
       }
       override def beforeInitializingSession(): Unit = {
         val expireEvent = new Expire
@@ -1411,6 +1411,21 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     }
   }
 
+  case object RegisterBrokerAndReelect extends ControllerEvent {
+    override def state: ControllerState = ControllerState.ControllerChange
+
+    override def process(): Unit = {
+      zkClient.registerBrokerInZk(brokerInfo)
+      val wasActiveBeforeChange = isActive
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      activeControllerId = zkClient.getControllerId.getOrElse(-1)
+      if (wasActiveBeforeChange && !isActive) {
+        onControllerResignation()
+      }
+      elect()
+    }
+  }
+
   // We can't make this a case object due to the countDownLatch field
   class Expire extends ControllerEvent {
     private val countDownLatch = new CountDownLatch(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 0edc07a..2ad8168 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -1,19 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.server
 
@@ -33,13 +33,13 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import scala.collection.mutable.Set
 
 /**
- * This class registers the broker in zookeeper to allow 
- * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
- *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
- *   
- * Right now our definition of health is fairly naive. If we register in zk we are healthy,
otherwise
- * we are dead.
- */
+  * This class registers the broker in zookeeper to allow
+  * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
+  *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
+  *
+  * Right now our definition of health is fairly naive. If we register in zk we are healthy,
otherwise
+  * we are dead.
+  */
 class KafkaHealthcheck(brokerId: Int,
                        advertisedEndpoints: Seq[EndPoint],
                        zkUtils: ZkUtils,
@@ -50,37 +50,16 @@ class KafkaHealthcheck(brokerId: Int,
 
   def startup() {
     zkUtils.subscribeStateChanges(sessionExpireListener)
-    register()
-  }
-
-  /**
-   * Register this broker as "alive" in zookeeper
-   */
-  def register() {
-    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    val updatedEndpoints = advertisedEndpoints.map(endpoint =>
-      if (endpoint.host == null || endpoint.host.trim.isEmpty)
-        endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
-      else
-        endpoint
-    )
-
-    // the default host and port are here for compatibility with older clients that only
support PLAINTEXT
-    // we choose the first plaintext port, if there is one
-    // or we register an empty endpoint, which means that older clients will not be able
to connect
-    val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
-      new EndPoint(null, -1, null, null))
-    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port,
updatedEndpoints, jmxPort, rack,
-      interBrokerProtocolVersion)
+    // registration is done in KafkaServer now
   }
 
   def shutdown(): Unit = sessionExpireListener.shutdown()
 
   /**
-   *  When we get a SessionExpired event, it means that we have lost all ephemeral nodes
and ZKClient has re-established
-   *  a connection for us. We need to re-register this broker in the broker registry. We
rely on `handleStateChanged`
-   *  to record ZooKeeper connection state metrics.
-   */
+    *  When we get a SessionExpired event, it means that we have lost all ephemeral nodes
and ZKClient has re-established
+    *  a connection for us. We need to re-register this broker in the broker registry. We
rely on `handleStateChanged`
+    *  to record ZooKeeper connection state metrics.
+    */
   class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup {
 
     private val metricNames = Set[String]()
@@ -117,18 +96,18 @@ class KafkaHealthcheck(brokerId: Int,
 
     @throws[Exception]
     override def handleNewSession() {
-      info("re-registering broker info in ZK for broker " + brokerId)
-      register()
-      info("done re-registering broker")
-      info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+      //info("re-registering broker info in ZK for broker " + brokerId)
+      //register()
+      //info("done re-registering broker")
+      //info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     }
 
-    override def handleSessionEstablishmentError(error: Throwable) {
-      fatal("Could not establish session with zookeeper", error)
+    override def handleSessionEstablishmentError(err: Throwable) {
+      error("Could not establish session with zookeeper", err)
     }
 
     def shutdown(): Unit = metricNames.foreach(removeMetric(_))
 
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 234923a..5bd3f8e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,14 +18,14 @@
 package kafka.server
 
 import java.io.{File, IOException}
-import java.net.SocketTimeoutException
+import java.net.{InetAddress, SocketTimeoutException}
 import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_9_0
-import kafka.cluster.Broker
+import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
@@ -36,7 +36,7 @@ import kafka.network.SocketServer
 import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
-import kafka.zk.KafkaZkClient
+import kafka.zk.{BrokerInfo, KafkaZkClient}
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
@@ -44,6 +44,7 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
@@ -145,6 +146,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   private var _clusterId: String = null
   private var _brokerTopicStats: BrokerTopicStats = null
 
+
   def clusterId: String = _clusterId
 
   // Visible for testing
@@ -242,8 +244,44 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
 
+        /* tell everyone we are alive */
+        val listeners = config.advertisedListeners.map { endpoint =>
+          if (endpoint.port == 0)
+            endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
+          else
+            endpoint
+        }
+
+        // to be cleaned up in KAFKA-6320
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
+          config.interBrokerProtocolVersion)
+        kafkaHealthcheck.startup()
+        // KAFKA-6320
+
+        val updatedEndpoints = listeners.map(endpoint =>
+          if (endpoint.host == null || endpoint.host.trim.isEmpty)
+            endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
+          else
+            endpoint
+        )
+
+        // the default host and port are here for compatibility with older clients that only
support PLAINTEXT
+        // we choose the first plaintext port, if there is one
+        // or we register an empty endpoint, which means that older clients will not be able
to connect
+        val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
+          new EndPoint(null, -1, null, null))
+
+        val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+        val brokerInfo = new BrokerInfo(config.brokerId,
+          plaintextEndpoint.host, plaintextEndpoint.port,
+          updatedEndpoints, jmxPort, config.rack, config.interBrokerProtocolVersion)
+        zkClient.registerBrokerInZk(brokerInfo)
+
+        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint
it
+        checkpointBrokerId(config.brokerId)
+
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo,
threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
@@ -285,19 +323,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-        /* tell everyone we are alive */
-        val listeners = config.advertisedListeners.map { endpoint =>
-          if (endpoint.port == 0)
-            endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
-          else
-            endpoint
-        }
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
-          config.interBrokerProtocolVersion)
-        kafkaHealthcheck.startup()
-
-        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint
it
-        checkpointBrokerId(config.brokerId)
 
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 97ae11e..d2bf881 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -18,6 +18,7 @@ package kafka.zk
 
 import java.util.Properties
 
+
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
@@ -73,6 +74,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean,
time: T
     createResponse.name
   }
 
+  def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
+    val brokerIdPath = brokerInfo.path()
+    checkedEphemeralCreate(brokerIdPath, brokerInfo.encode())
+    info("Registered broker %d at path %s with addresses: %s".format(brokerInfo.id, brokerIdPath,
brokerInfo.endpoints()))
+  }
+
+
   /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want ot get states.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 2223001..8688104 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -60,6 +60,27 @@ object BrokerIdsZNode {
   def encode: Array[Byte] = null
 }
 
+class BrokerInfo(val id: Int,
+                 host: String,
+                 port: Int,
+                 advertisedEndpoints: Seq[EndPoint],
+                 jmxPort: Int,
+                 rack: Option[String],
+                 apiVersion: ApiVersion) {
+
+  def path(): String = {
+    BrokerIdZNode.path(id)
+  }
+
+  def endpoints(): String = {
+    advertisedEndpoints.mkString(",")
+  }
+
+  def encode(): Array[Byte] = {
+    BrokerIdZNode.encode(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)
+  }
+}
+
 object BrokerIdZNode {
   def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
   def encode(id: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 0ba133f..29a1fa6 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -24,6 +24,8 @@ import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import java.io.File
 
+import org.apache.zookeeper.KeeperException.NodeExistsException
+
 class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   var props1: Properties = null
   var config1: KafkaConfig = null
@@ -149,7 +151,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     val propsB = TestUtils.createBrokerConfig(1, zkConnect)
     val configB = KafkaConfig.fromProps(propsB)
     val serverB = new KafkaServer(configB)
-    intercept[RuntimeException] {
+    intercept[NodeExistsException] {
       serverB.startup()
     }
     servers = Seq(serverA)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5daa40e/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index c764369..eedf5f3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,8 +17,12 @@
 
 package kafka.server
 
+import java.net.BindException
+
+import kafka.common.KafkaException
 import kafka.utils.{TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -62,7 +66,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
       TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Starting a broker with the same port should fail")
     } catch {
-      case _: RuntimeException => // expected
+      case _: KafkaException => // expected
     }
   }
 
@@ -81,7 +85,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
       TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Registering a broker with a conflicting id should fail")
     } catch {
-      case _: RuntimeException =>
+      case _: NodeExistsException =>
       // this is expected
     }
 


Mime
View raw message