kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3916; Check for disconnects properly before sending from the controller
Date Tue, 23 Aug 2016 02:21:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 05d00b5ac -> 24fd025d4


KAFKA-3916; Check for disconnects properly before sending from the controller

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1734 from hachikuji/KAFKA-3916


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24fd025d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24fd025d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24fd025d

Branch: refs/heads/trunk
Commit: 24fd025d407d42176847143d3a0dc416a75d8f35
Parents: 05d00b5
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Aug 23 02:55:17 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Aug 23 02:59:30 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   | 125 +++++++++++++------
 .../controller/ControllerChannelManager.scala   |  19 ++-
 .../kafka/utils/NetworkClientBlockingOps.scala  |  58 ++++-----
 3 files changed, 120 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ab9dab9..5244710 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
  */
 public class Selector implements Selectable {
 
+    public static final long NO_IDLE_TIMEOUT_MS = -1;
     private static final Logger log = LoggerFactory.getLogger(Selector.class);
 
     private final java.nio.channels.Selector nioSelector;
@@ -93,25 +94,36 @@ public class Selector implements Selectable {
     private final String metricGrpPrefix;
     private final Map<String, String> metricTags;
     private final ChannelBuilder channelBuilder;
-    private final Map<String, Long> lruConnections;
-    private final long connectionsMaxIdleNanos;
     private final int maxReceiveSize;
     private final boolean metricsPerConnection;
-    private long currentTimeNanos;
-    private long nextIdleCloseCheckTime;
-
+    private final IdleExpiryManager idleExpiryManager;
 
     /**
      * Create a new nioSelector
+     *
+     * @param maxReceiveSize Max size in bytes of a single network receive (use {@link NetworkReceive#UNLIMITED}
for no limit)
+     * @param connectionMaxIdleMs Max idle connection time (use {@link #NO_IDLE_TIMEOUT_MS}
to disable idle timeout)
+     * @param metrics Registry for Selector metrics
+     * @param time Time implementation
+     * @param metricGrpPrefix Prefix for the group of metrics registered by Selector
+     * @param metricTags Additional tags to add to metrics registered by Selector
+     * @param metricsPerConnection Whether or not to enable per-connection metrics
+     * @param channelBuilder Channel builder for every new connection
      */
-    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time,
String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection,
ChannelBuilder channelBuilder) {
+    public Selector(int maxReceiveSize,
+                    long connectionMaxIdleMs,
+                    Metrics metrics,
+                    Time time,
+                    String metricGrpPrefix,
+                    Map<String, String> metricTags,
+                    boolean metricsPerConnection,
+                    ChannelBuilder channelBuilder) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
             throw new KafkaException(e);
         }
         this.maxReceiveSize = maxReceiveSize;
-        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
         this.time = time;
         this.metricGrpPrefix = metricGrpPrefix;
         this.metricTags = metricTags;
@@ -125,11 +137,8 @@ public class Selector implements Selectable {
         this.failedSends = new ArrayList<>();
         this.sensors = new SelectorMetrics(metrics);
         this.channelBuilder = channelBuilder;
-        // initial capacity and load factor are default, we set them explicitly because we
want to set accessOrder = true
-        this.lruConnections = new LinkedHashMap<>(16, .75F, true);
-        currentTimeNanos = time.nanoseconds();
-        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
         this.metricsPerConnection = metricsPerConnection;
+        this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time,
connectionMaxIdleMs);
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix,
ChannelBuilder channelBuilder) {
@@ -276,22 +285,26 @@ public class Selector implements Selectable {
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
         long endSelect = time.nanoseconds();
-        currentTimeNanos = endSelect;
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
         if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
-            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
-            pollSelectionKeys(immediatelyConnectedKeys, true);
+            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
+            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
         }
 
         addToCompletedReceives();
 
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
-        maybeCloseOldestConnection();
+
+        // we use the time at the end of select to ensure that we don't close any connections
that
+        // have just been processed in pollSelectionKeys
+        maybeCloseOldestConnection(endSelect);
     }
 
-    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected)
{
+    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
+                                   boolean isImmediatelyConnected,
+                                   long currentTimeNanos) {
         Iterator<SelectionKey> iterator = selectionKeys.iterator();
         while (iterator.hasNext()) {
             SelectionKey key = iterator.next();
@@ -300,7 +313,8 @@ public class Selector implements Selectable {
 
             // register all per-connection metrics at once
             sensors.maybeRegisterConnectionMetrics(channel.id());
-            lruConnections.put(channel.id(), currentTimeNanos);
+            if (idleExpiryManager != null)
+                idleExpiryManager.update(channel.id(), currentTimeNanos);
 
             try {
 
@@ -409,24 +423,20 @@ public class Selector implements Selectable {
             unmute(channel);
     }
 
-    private void maybeCloseOldestConnection() {
-        if (currentTimeNanos > nextIdleCloseCheckTime) {
-            if (lruConnections.isEmpty()) {
-                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
-            } else {
-                Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
-                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
-                nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
-                if (currentTimeNanos > nextIdleCloseCheckTime) {
-                    String connectionId = oldestConnectionEntry.getKey();
-                    if (log.isTraceEnabled())
-                        log.trace("About to close the idle connection from " + connectionId
-                                + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime)
/ 1000 / 1000 + " millis");
-
-                    disconnected.add(connectionId);
-                    close(connectionId);
-                }
-            }
+    private void maybeCloseOldestConnection(long currentTimeNanos) {
+        if (idleExpiryManager == null)
+            return;
+
+        Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
+        if (expiredConnection != null) {
+            String connectionId = expiredConnection.getKey();
+
+            if (log.isTraceEnabled())
+                log.trace("About to close the idle connection from {} due to being idle for
{} millis",
+                        connectionId, (currentTimeNanos - expiredConnection.getValue()) /
1000 / 1000);
+
+            disconnected.add(connectionId);
+            close(connectionId);
         }
     }
 
@@ -480,8 +490,10 @@ public class Selector implements Selectable {
         }
         this.stagedReceives.remove(channel);
         this.channels.remove(channel.id());
-        this.lruConnections.remove(channel.id());
         this.sensors.connectionClosed.record();
+
+        if (idleExpiryManager != null)
+            idleExpiryManager.remove(channel.id());
     }
 
 
@@ -726,4 +738,45 @@ public class Selector implements Selectable {
         }
     }
 
+    // helper class for tracking least recently used connections to enable idle connection
closing
+    private static class IdleExpiryManager {
+        private final Map<String, Long> lruConnections;
+        private final long connectionsMaxIdleNanos;
+        private long nextIdleCloseCheckTime;
+
+        public IdleExpiryManager(Time time, long connectionsMaxIdleMs) {
+            this.connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000;
+            // initial capacity and load factor are default, we set them explicitly because
we want to set accessOrder = true
+            this.lruConnections = new LinkedHashMap<>(16, .75F, true);
+            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
+        }
+
+        public void update(String connectionId, long currentTimeNanos) {
+            lruConnections.put(connectionId, currentTimeNanos);
+        }
+
+        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos)
{
+            if (currentTimeNanos <= nextIdleCloseCheckTime)
+                return null;
+
+            if (lruConnections.isEmpty()) {
+                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
+                return null;
+            }
+
+            Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
+            Long connectionLastActiveTime = oldestConnectionEntry.getValue();
+            nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;
+
+            if (currentTimeNanos > nextIdleCloseCheckTime)
+                return oldestConnectionEntry;
+            else
+                return null;
+        }
+
+        public void remove(String connectionId) {
+            lruConnections.remove(connectionId);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3007004..c46a536 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -100,7 +100,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config:
Kaf
       )
       val selector = new Selector(
         NetworkReceive.UNLIMITED,
-        config.connectionsMaxIdleMs,
+        Selector.NO_IDLE_TIMEOUT_MS,
         metrics,
         time,
         "controller-channel",
@@ -167,7 +167,7 @@ class RequestSendThread(val controllerId: Int,
 
   override def doWork(): Unit = {
 
-    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
+    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
 
     val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
     import NetworkClientBlockingOps._
@@ -226,18 +226,13 @@ class RequestSendThread(val controllerId: Int,
   private def brokerReady(): Boolean = {
     import NetworkClientBlockingOps._
     try {
+      val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
 
-      if (networkClient.isReady(brokerNode, time.milliseconds()))
-        true
-      else {
-        val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
+      if (!ready)
+        throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
-        if (!ready)
-          throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
-
-        info("Controller %d connected to %s for sending state change requests".format(controllerId,
brokerNode.toString()))
-        true
-      }
+      info("Controller %d connected to %s for sending state change requests".format(controllerId,
brokerNode.toString()))
+      true
     } catch {
       case e: Throwable =>
         warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId,
brokerNode.toString()), e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index fd4af6e..9aca663 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -45,24 +45,42 @@ object NetworkClientBlockingOps {
 class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
 
   /**
-   * Invokes `client.ready` followed by 0 or more `client.poll` invocations until the connection
to `node` is ready,
-   * the timeout expires or the connection fails.
+   * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and
0 or more `client.poll`
+   * invocations until the connection to `node` is ready, the timeout expires or the connection
fails.
    *
    * It returns `true` if the call completes normally or `false` if the timeout expires.
If the connection fails,
-   * an `IOException` is thrown instead.
+   * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured
with a positive
+   * connection timeout, it is possible for this method to raise an `IOException` for a previous
connection which
+   * has recently disconnected.
    *
    * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
    * care.
    */
   def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
     require(timeout >=0, "timeout should be >= 0")
-    client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
-      if (client.isReady(node, now))
+
+    val startTime = time.milliseconds()
+    val expiryTime = startTime + timeout
+
+    @tailrec
+    def awaitReady(iterationStartTime: Long): Boolean = {
+      if (client.isReady(node, iterationStartTime))
         true
       else if (client.connectionFailed(node))
         throw new IOException(s"Connection to $node failed")
-      else false
+      else {
+        val pollTimeout = expiryTime - iterationStartTime
+        client.poll(pollTimeout, iterationStartTime)
+        val afterPollTime = time.milliseconds()
+        if (afterPollTime < expiryTime) awaitReady(afterPollTime)
+        else false
+      }
     }
+
+    // poll once to receive pending disconnects
+    client.poll(0, startTime)
+
+    client.ready(node, startTime) || awaitReady(startTime)
   }
 
   /**
@@ -93,34 +111,6 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal
{
   }
 
   /**
-   * Invokes `client.poll` until `predicate` returns `true` or the timeout expires.
-   *
-   * It returns `true` if the call completes normally or `false` if the timeout expires.
Exceptions thrown via
-   * `predicate` are not handled and will bubble up.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking
`NetworkClient`, use it with
-   * care.
-   */
-  private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit
time: JTime): Boolean = {
-    val methodStartTime = time.milliseconds()
-    val timeoutExpiryTime = methodStartTime + timeout
-
-    @tailrec
-    def recursivePoll(iterationStartTime: Long): Boolean = {
-      val pollTimeout = timeoutExpiryTime - iterationStartTime
-      val responses = client.poll(pollTimeout, iterationStartTime).asScala
-      if (predicate(responses, iterationStartTime)) true
-      else {
-        val afterPollTime = time.milliseconds()
-        if (afterPollTime < timeoutExpiryTime) recursivePoll(afterPollTime)
-        else false
-      }
-    }
-
-    recursivePoll(methodStartTime)
-  }
-
-  /**
     * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
     *
     * Exceptions thrown via `collect` are not handled and will bubble up.


Mime
View raw message