kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
Date Fri, 16 Nov 2018 10:23:23 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new ad40bed  KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
ad40bed is described below

commit ad40bed8d9aea91a684276de165b63dcce116c1f
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Nov 16 01:27:29 2018 +0000

    KAFKA-7576; Fix shutdown of replica fetcher threads (#5875)
    
    ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread
is running. This in unsafe and can result in `Selector.close()` failing with an exception.
The exception is caught and logged at debug level, but this can lead to socket leak if the
shutdown is due to dynamic config update rather than broker shutdown. This PR changes the
shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup()
and flag to terminate blocking sen [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/clients/KafkaClient.java | 16 ++++++
 .../org/apache/kafka/clients/NetworkClient.java    | 40 ++++++++++++-
 .../apache/kafka/clients/NetworkClientUtils.java   | 33 +++++++----
 .../java/org/apache/kafka/clients/MockClient.java  | 12 ++++
 .../controller/ControllerChannelManager.scala      |  7 +++
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |  6 ++
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 27 ++++++---
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 32 +++++++++++
 .../unit/kafka/server/ServerShutdownTest.scala     | 65 +++++++++++++++++++++-
 .../util/ReplicaFetcherMockBlockingSend.scala      |  2 +
 10 files changed, 217 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 448932e..18a7eef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -197,4 +197,20 @@ public interface KafkaClient extends Closeable {
                                    int requestTimeoutMs,
                                    RequestCompletionHandler callback);
 
+
+
+    /**
+     * Initiates shutdown of this client. This method may be invoked from another thread
while this
+     * client is being polled. No further requests may be sent using the client. The current
poll()
+     * will be terminated using wakeup(). The client should be explicitly shutdown using
{@link #close()}
+     * after poll returns. Note that {@link #close()} should not be invoked concurrently
while polling.
+     */
+    void initiateClose();
+
+    /**
+     * Returns true if the client is still active. Returns false if {@link #initiateClose()}
or {@link #close()}
+     * was invoked for this client.
+     */
+    boolean active();
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index e4ba197..4f24b54 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
@@ -53,6 +54,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -63,6 +65,12 @@ import java.util.stream.Collectors;
  */
 public class NetworkClient implements KafkaClient {
 
+    private enum State {
+        ACTIVE,
+        CLOSING,
+        CLOSED
+    }
+
     private final Logger log;
 
     /* the selector used to perform network i/o */
@@ -111,6 +119,8 @@ public class NetworkClient implements KafkaClient {
 
     private final Sensor throttleTimeSensor;
 
+    private final AtomicReference<State> state;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -243,6 +253,7 @@ public class NetworkClient implements KafkaClient {
         this.apiVersions = apiVersions;
         this.throttleTimeSensor = throttleTimeSensor;
         this.log = logContext.logger(NetworkClient.class);
+        this.state = new AtomicReference<>(State.ACTIVE);
     }
 
     /**
@@ -418,6 +429,7 @@ public class NetworkClient implements KafkaClient {
     }
 
     private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now)
{
+        ensureActive();
         String nodeId = clientRequest.destination();
         if (!isInternalRequest) {
             // If this request came from outside the NetworkClient, validate
@@ -496,6 +508,8 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
+        ensureActive();
+
         if (!abortedSends.isEmpty()) {
             // If there are aborted sends because of unsupported version exceptions or disconnects,
             // handle them immediately without waiting for Selector#poll.
@@ -575,13 +589,35 @@ public class NetworkClient implements KafkaClient {
         this.selector.wakeup();
     }
 
+    @Override
+    public void initiateClose() {
+        if (state.compareAndSet(State.ACTIVE, State.CLOSING)) {
+            wakeup();
+        }
+    }
+
+    @Override
+    public boolean active() {
+        return state.get() == State.ACTIVE;
+    }
+
+    private void ensureActive() {
+        if (!active())
+            throw new DisconnectException("NetworkClient is no longer active, state is "
+ state);
+    }
+
     /**
      * Close the network client
      */
     @Override
     public void close() {
-        this.selector.close();
-        this.metadataUpdater.close();
+        state.compareAndSet(State.ACTIVE, State.CLOSING);
+        if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
+            this.selector.close();
+            this.metadataUpdater.close();
+        } else {
+            log.warn("Attempting to close NetworkClient that has already been closed.");
+        }
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index 94fe288..c952b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.utils.Time;
 
 import java.io.IOException;
@@ -83,25 +84,35 @@ public final class NetworkClientUtils {
      * disconnection happens (which can happen for a number of reasons including a request
timeout).
      *
      * In case of a disconnection, an `IOException` is thrown.
+     * If shutdown is initiated on the client during this method, an IOException is thrown.
      *
      * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
      * care.
      */
     public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest request,
Time time) throws IOException {
-        client.send(request, time.milliseconds());
-        while (true) {
-            List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
-            for (ClientResponse response : responses) {
-                if (response.requestHeader().correlationId() == request.correlationId())
{
-                    if (response.wasDisconnected()) {
-                        throw new IOException("Connection to " + response.destination() +
" was disconnected before the response was read");
+        try {
+            client.send(request, time.milliseconds());
+            while (client.active()) {
+                List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
+                for (ClientResponse response : responses) {
+                    if (response.requestHeader().correlationId() == request.correlationId())
{
+                        if (response.wasDisconnected()) {
+                            throw new IOException("Connection to " + response.destination()
+ " was disconnected before the response was read");
+                        }
+                        if (response.versionMismatch() != null) {
+                            throw response.versionMismatch();
+                        }
+                        return response;
                     }
-                    if (response.versionMismatch() != null) {
-                        throw response.versionMismatch();
-                    }
-                    return response;
                 }
             }
+            throw new IOException("Client was shutdown before response was read");
+        } catch (DisconnectException e) {
+            if (client.active())
+                throw e;
+            else
+                throw new IOException("Client was shutdown before response was read");
+
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6b41a9e..70ffeae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -96,6 +96,7 @@ public class MockClient implements KafkaClient {
     private final Queue<MetadataUpdate> metadataUpdates = new ConcurrentLinkedDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
+    private volatile boolean active = true;
 
     public MockClient(Time time) {
         this(time, null);
@@ -532,7 +533,18 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public void initiateClose() {
+        close();
+    }
+
+    @Override
+    public boolean active() {
+        return active;
+    }
+
+    @Override
     public void close() {
+        active = false;
         metadata.close();
     }
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 096b2b4..6121da4 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -291,6 +291,13 @@ class RequestSendThread(val controllerId: Int,
     }
   }
 
+  override def initiateShutdown(): Boolean = {
+    if (super.initiateShutdown()) {
+      networkClient.initiateClose()
+      true
+    } else
+      false
+  }
 }
 
 class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogger: StateChangeLogger)
extends  Logging {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 4c7adfb..6048c66 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -35,6 +35,8 @@ trait BlockingSend {
 
   def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse
 
+  def initiateClose()
+
   def close()
 }
 
@@ -103,6 +105,10 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
     }
   }
 
+  override def initiateClose(): Unit = {
+    networkClient.initiateClose()
+  }
+
   def close(): Unit = {
     networkClient.close()
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6ad375c..4ac4189 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -101,23 +101,32 @@ class ReplicaFetcherThread(name: String,
   override def initiateShutdown(): Boolean = {
     val justShutdown = super.initiateShutdown()
     if (justShutdown) {
-      // leaderEndpoint.close() can throw an exception when the replica fetcher thread is
still
-      // actively fetching because the selector can close the channel while sending the request
-      // after we initiate leaderEndpoint.close() and the leaderEndpoint.close() itself may
also close
-      // the channel again. When this race condition happens, an exception will be thrown.
-      // Throwing the exception to the caller may fail the ReplicaManager shutdown. It is
safe to catch
-      // the exception without here causing correctness issue because we are going to shutdown
the thread
-      // and will not re-use the leaderEndpoint anyway.
+      // This is thread-safe, so we don't expect any exceptions, but catch and log any errors
+      // to avoid failing the caller, especially during shutdown. We will attempt to close
+      // leaderEndpoint after the thread terminates.
       try {
-        leaderEndpoint.close()
+        leaderEndpoint.initiateClose()
       } catch {
         case t: Throwable =>
-          debug(s"Fail to close leader endpoint $leaderEndpoint after initiating replica
fetcher thread shutdown", t)
+          error(s"Failed to initiate shutdown of leader endpoint $leaderEndpoint after initiating
replica fetcher thread shutdown", t)
       }
     }
     justShutdown
   }
 
+  override def awaitShutdown(): Unit = {
+    super.awaitShutdown()
+    // We don't expect any exceptions here, but catch and log any errors to avoid failing
the caller,
+    // especially during shutdown. It is safe to catch the exception here without causing
correctness
+    // issue because we are going to shutdown the thread and will not re-use the leaderEndpoint
anyway.
+    try {
+      leaderEndpoint.close()
+    } catch {
+      case t: Throwable =>
+        error(s"Failed to close leader endpoint $leaderEndpoint after shutting down replica
fetcher thread", t)
+    }
+  }
+
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PartitionData) {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 6fd0837..9f2f710 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -23,6 +23,7 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.metrics.Metrics
@@ -718,6 +719,37 @@ class ReplicaFetcherThreadTest {
     assertEquals(49, truncateToCapture.getValue)
   }
 
+  @Test
+  def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit =
{
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    val config = KafkaConfig.fromProps(props)
+    val mockBlockingSend = createMock(classOf[BlockingSend])
+
+    expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
+    expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
+    replay(mockBlockingSend)
+
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(mockBlockingSend))
+    thread.start()
+
+    // Verify that:
+    //   1) IllegalArgumentException thrown by BlockingSend#initiateClose() during `initiateShutdown`
is not propagated
+    //   2) BlockingSend.close() is invoked even if BlockingSend#initiateClose() fails
+    //   3) IllegalStateException thrown by BlockingSend.close() during `awaitShutdown` is
not propagated
+    thread.initiateShutdown()
+    thread.awaitShutdown()
+    verify(mockBlockingSend)
+  }
+
   def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = {
     expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
     expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 9f966b4..94cf695 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -19,15 +19,24 @@ package kafka.server
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
-import java.io.File
+import java.io.{DataInputStream, File}
+import java.net.ServerSocket
+import java.util.concurrent.{Executors, TimeUnit}
 
+import kafka.cluster.Broker
+import kafka.controller.{ControllerChannelManager, ControllerContext, StateChangeLogger}
 import kafka.log.LogManager
 import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer,
StringSerializer}
+import org.apache.kafka.common.utils.Time
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
@@ -190,4 +199,58 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     server.awaitShutdown()
     server.shutdown()
   }
+
+  // Verify that if controller is in the midst of processing a request, shutdown completes
+  // without waiting for request timeout.
+  @Test
+  def testControllerShutdownDuringSend(): Unit = {
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+
+    val controllerId = 2
+    val metrics = new Metrics
+    val executor = Executors.newSingleThreadExecutor
+    var serverSocket: ServerSocket = null
+    var controllerChannelManager: ControllerChannelManager = null
+
+    try {
+      // Set up a server to accept a connection and receive one byte from the first request.
No response is sent.
+      serverSocket = new ServerSocket(0)
+      val receiveFuture = executor.submit(new Runnable {
+        override def run(): Unit = {
+          val socket = serverSocket.accept()
+          new DataInputStream(socket.getInputStream).readByte()
+        }
+      })
+
+      // Start a ControllerChannelManager
+      val brokers = Seq(new Broker(1, "localhost", serverSocket.getLocalPort, listenerName,
securityProtocol))
+      val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId,
zkConnect))
+      val controllerContext = new ControllerContext
+      controllerContext.liveBrokers = brokers.toSet
+      controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig,
Time.SYSTEM,
+        metrics, new StateChangeLogger(controllerId, inControllerContext = true, None))
+      controllerChannelManager.startup()
+
+      // Initiate a sendRequest and wait until connection is established and one byte is
received by the peer
+      val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+        controllerId, 1, Map.empty.asJava, brokers.map(_.node(listenerName)).toSet.asJava)
+      controllerChannelManager.sendRequest(1, ApiKeys.LEADER_AND_ISR, requestBuilder)
+      receiveFuture.get(10, TimeUnit.SECONDS)
+
+      // Shutdown controller. Request timeout is 30s, verify that shutdown completed well
before that
+      val shutdownFuture = executor.submit(new Runnable {
+        override def run(): Unit = controllerChannelManager.shutdown()
+      })
+      shutdownFuture.get(10, TimeUnit.SECONDS)
+
+    } finally {
+      if (serverSocket != null)
+        serverSocket.close()
+      if (controllerChannelManager != null)
+        controllerChannelManager.shutdown()
+      executor.shutdownNow()
+      metrics.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index b7c037e..fdb22b5 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -86,5 +86,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition,
Epoc
       true)
   }
 
+  override def initiateClose(): Unit = {}
+
   override def close(): Unit = {}
 }


Mime
View raw message