kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4959; Remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine
Date Tue, 28 Mar 2017 22:53:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 23a0f0986 -> edb372dca

KAFKA-4959; Remove controller concurrent access to non-threadsafe NetworkClient, Selector,
and SSLEngine

This brought down a cluster by causing continuous controller moves.

ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects that aren't
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker communication).

As per the "Concurrency Notes" section from https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html:
> two threads must not attempt to call the same method (either wrap() or unwrap()) concurrently

SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close

It turns out that the ZkEventThread and RequestSendThread can concurrently call SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or SslTransportLayer.handshake from NetworkClient.poll

Suppose the controller moves for whatever reason. The former controller could have had a RequestSendThread
was in the middle of sending out messages to the cluster while the ZkEventThread began executing
KafkaController.onControllerResignation, which calls ControllerChannelManager.shutdown, which
sequentially cleans
up the controller-to-broker queue and connection for every broker in the cluster. This cleanup
includes the call
to ControllerChannelManager.removeExistingBroker as mentioned earlier, causing the concurrent
call to SSLEngine.wrap.
This concurrent call throws a BufferOverflowException which ControllerChannelManager.removeExistingBroker
catches so
the ControllerChannelManager.shutdown moves onto cleaning up the next controller-to-broker
queue and connection,
skipping the cleanup steps such as clearing the queue, stopping the RequestSendThread, and
removing the entry from its
brokerStateInfo map.

By failing out of the Selector.close, the sensors corresponding to the broker connection has
not been cleaned up. Any
later attempt at initializing an identical Selector will result in a sensor collision and
therefore cause Selector
initialization to throw an exception. In other words, any later attempts by this broker to
become controller again
will fail on initialization. When controller initialization fails, the controller deletes
the /controller znode and
lets another broker take over.

Now suppose the controller moves enough times such that every broker hits the BufferOverflowException
issue. We're now guaranteed to fail controller initialization due to the sensor collision
on every controller
transition, so the controller will move across brokers continuously.

This patch avoids the concurrent use of non-threadsafe classes in ControllerChannelManager.removeExistingBroker
by shutting down the RequestSendThread before closing the NetworkClient.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2746 from onurkaraman/KAFKA-4959

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/edb372dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/edb372dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/edb372dc

Branch: refs/heads/trunk
Commit: edb372dcaf34312b5c1c91c4df5e5ff87586e73b
Parents: 23a0f09
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Tue Mar 28 23:20:20 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Mar 28 23:47:51 2017 +0100

 .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 740fff4..a2308b2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -138,9 +138,13 @@ class ControllerChannelManager(controllerContext: ControllerContext,
config: Kaf
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
+      // Shutdown the RequestSendThread before closing the NetworkClient to avoid the concurrent
use of the
+      // non-threadsafe classes as described in KAFKA-4959.
+      // The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization
barrier that
+      // hands off the NetworkClient from the RequestSendThread to the ZkEventThread.
+      brokerState.requestSendThread.shutdown()
-      brokerState.requestSendThread.shutdown()
     } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)

View raw message