kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7712; Remove channel from Selector before propagating exception (#6023)
Date Wed, 12 Dec 2018 18:51:11 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46e8081  KAFKA-7712; Remove channel from Selector before propagating exception (#6023)
46e8081 is described below

commit 46e8081f9cae24f5edbfa2099e317a33e531d2fb
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Dec 12 18:50:56 2018 +0000

    KAFKA-7712; Remove channel from Selector before propagating exception (#6023)
    
    Ensure that channel and selection keys are removed from `Selector` collections before
propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't
ensure that callers (NetworkClient for example) wont try to connect again before the next
`poll` and hence we should clear the collections before re-throwing exceptions from `connect()`.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../org/apache/kafka/common/network/Selector.java  |  8 ++-
 .../apache/kafka/common/network/SelectorTest.java  | 76 +++++++++++++++++++---
 2 files changed, 72 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 843d46d..8c46746 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -250,10 +250,11 @@ public class Selector implements Selectable, AutoCloseable {
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
throws IOException {
         ensureNotRegistered(id);
         SocketChannel socketChannel = SocketChannel.open();
+        SelectionKey key = null;
         try {
             configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
             boolean connected = doConnect(socketChannel, address);
-            SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
+            key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
 
             if (connected) {
                 // OP_CONNECT won't trigger for immediately connected channels
@@ -262,6 +263,9 @@ public class Selector implements Selectable, AutoCloseable {
                 key.interestOps(0);
             }
         } catch (IOException | RuntimeException e) {
+            if (key != null)
+                immediatelyConnectedKeys.remove(key);
+            channels.remove(id);
             socketChannel.close();
             throw e;
         }
@@ -316,7 +320,7 @@ public class Selector implements Selectable, AutoCloseable {
             throw new IllegalStateException("There is already a connection for id " + id
+ " that is still being closed");
     }
 
-    private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps)
throws IOException {
+    protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps)
throws IOException {
         SelectionKey key = socketChannel.register(nioSelector, interestedOps);
         KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
         this.channels.put(id, channel);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6cf7586..2da1cc6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -380,16 +381,7 @@ public class SelectorTest {
     @Test
     public void testImmediatelyConnectedCleaned() throws Exception {
         Metrics metrics = new Metrics(); // new metrics object to avoid metric registration
conflicts
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder,
new LogContext()) {
-            @Override
-            protected boolean doConnect(SocketChannel channel, InetSocketAddress address)
throws IOException {
-                // Use a blocking connect to trigger the immediately connected path
-                channel.configureBlocking(true);
-                boolean connected = super.doConnect(channel, address);
-                channel.configureBlocking(false);
-                return connected;
-            }
-        };
+        Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup",
channelBuilder, new LogContext());
 
         try {
             testImmediatelyConnectedCleaned(selector, true);
@@ -400,6 +392,26 @@ public class SelectorTest {
         }
     }
 
+    private static class ImmediatelyConnectingSelector extends Selector {
+        public ImmediatelyConnectingSelector(long connectionMaxIdleMS,
+                                             Metrics metrics,
+                                             Time time,
+                                             String metricGrpPrefix,
+                                             ChannelBuilder channelBuilder,
+                                             LogContext logContext) {
+            super(connectionMaxIdleMS, metrics, time, metricGrpPrefix, channelBuilder, logContext);
+        }
+
+        @Override
+        protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws
IOException {
+            // Use a blocking connect to trigger the immediately connected path
+            channel.configureBlocking(true);
+            boolean connected = super.doConnect(channel, address);
+            channel.configureBlocking(false);
+            return connected;
+        }
+    }
+
     private void testImmediatelyConnectedCleaned(Selector selector, boolean closeAfterFirstPoll)
throws Exception {
         String id = "0";
         selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE,
BUFFER_SIZE);
@@ -412,6 +424,46 @@ public class SelectorTest {
         verifySelectorEmpty(selector);
     }
 
+    /**
+     * Verify that if Selector#connect fails and throws an Exception, all related objects
+     * are cleared immediately before the exception is propagated.
+     */
+    @Test
+    public void testConnectException() throws Exception {
+        Metrics metrics = new Metrics();
+        AtomicBoolean throwIOException = new AtomicBoolean();
+        Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup",
channelBuilder, new LogContext()) {
+            @Override
+            protected SelectionKey registerChannel(String id, SocketChannel socketChannel,
int interestedOps) throws IOException {
+                SelectionKey key = super.registerChannel(id, socketChannel, interestedOps);
+                key.cancel();
+                if (throwIOException.get())
+                    throw new IOException("Test exception");
+                return key;
+            }
+        };
+
+        try {
+            verifyImmediatelyConnectedException(selector, "0");
+            throwIOException.set(true);
+            verifyImmediatelyConnectedException(selector, "1");
+        } finally {
+            selector.close();
+            metrics.close();
+        }
+    }
+
+    private void verifyImmediatelyConnectedException(Selector selector, String id) throws
Exception {
+        try {
+            selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE,
BUFFER_SIZE);
+            fail("Expected exception not thrown");
+        } catch (Exception e) {
+            verifyEmptyImmediatelyConnectedKeys(selector);
+            assertNull("Channel not removed", selector.channel(id));
+            ensureEmptySelectorFields(selector);
+        }
+    }
+
     @Test
     public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
         verifyCloseOldestConnectionWithStagedReceives(1);
@@ -715,6 +767,10 @@ public class SelectorTest {
         }
         selector.poll(0);
         selector.poll(0); // Poll a second time to clear everything
+        ensureEmptySelectorFields(selector);
+    }
+
+    private void ensureEmptySelectorFields(Selector selector) throws Exception {
         for (Field field : Selector.class.getDeclaredFields()) {
             ensureEmptySelectorField(selector, field);
         }


Mime
View raw message