This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 46e8081 KAFKA-7712; Remove channel from Selector before propagating exception (#6023) 46e8081 is described below commit 46e8081f9cae24f5edbfa2099e317a33e531d2fb Author: Rajini Sivaram AuthorDate: Wed Dec 12 18:50:56 2018 +0000 KAFKA-7712; Remove channel from Selector before propagating exception (#6023) Ensure that channel and selection keys are removed from `Selector` collections before propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't ensure that callers (NetworkClient for example) wont try to connect again before the next `poll` and hence we should clear the collections before re-throwing exceptions from `connect()`. Reviewers: Jason Gustafson --- .../org/apache/kafka/common/network/Selector.java | 8 ++- .../apache/kafka/common/network/SelectorTest.java | 76 +++++++++++++++++++--- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 843d46d..8c46746 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -250,10 +250,11 @@ public class Selector implements Selectable, AutoCloseable { public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { ensureNotRegistered(id); SocketChannel socketChannel = SocketChannel.open(); + SelectionKey key = null; try { configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize); boolean connected = doConnect(socketChannel, address); - SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); + key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); if (connected) { // OP_CONNECT won't trigger for immediately connected channels @@ -262,6 +263,9 @@ public class Selector implements Selectable, AutoCloseable { key.interestOps(0); } } catch (IOException | RuntimeException e) { + if (key != null) + immediatelyConnectedKeys.remove(key); + channels.remove(id); socketChannel.close(); throw e; } @@ -316,7 +320,7 @@ public class Selector implements Selectable, AutoCloseable { throw new IllegalStateException("There is already a connection for id " + id + " that is still being closed"); } - private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { + protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { SelectionKey key = socketChannel.register(nioSelector, interestedOps); KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key); this.channels.put(id, channel); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 6cf7586..2da1cc6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -380,16 +381,7 @@ public class SelectorTest { @Test public void testImmediatelyConnectedCleaned() throws Exception { Metrics metrics = new Metrics(); // new metrics object to avoid metric registration conflicts - Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) { - @Override - protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException { - // Use a blocking connect to trigger the immediately connected path - channel.configureBlocking(true); - boolean connected = super.doConnect(channel, address); - channel.configureBlocking(false); - return connected; - } - }; + Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()); try { testImmediatelyConnectedCleaned(selector, true); @@ -400,6 +392,26 @@ public class SelectorTest { } } + private static class ImmediatelyConnectingSelector extends Selector { + public ImmediatelyConnectingSelector(long connectionMaxIdleMS, + Metrics metrics, + Time time, + String metricGrpPrefix, + ChannelBuilder channelBuilder, + LogContext logContext) { + super(connectionMaxIdleMS, metrics, time, metricGrpPrefix, channelBuilder, logContext); + } + + @Override + protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException { + // Use a blocking connect to trigger the immediately connected path + channel.configureBlocking(true); + boolean connected = super.doConnect(channel, address); + channel.configureBlocking(false); + return connected; + } + } + private void testImmediatelyConnectedCleaned(Selector selector, boolean closeAfterFirstPoll) throws Exception { String id = "0"; selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); @@ -412,6 +424,46 @@ public class SelectorTest { verifySelectorEmpty(selector); } + /** + * Verify that if Selector#connect fails and throws an Exception, all related objects + * are cleared immediately before the exception is propagated. + */ + @Test + public void testConnectException() throws Exception { + Metrics metrics = new Metrics(); + AtomicBoolean throwIOException = new AtomicBoolean(); + Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) { + @Override + protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { + SelectionKey key = super.registerChannel(id, socketChannel, interestedOps); + key.cancel(); + if (throwIOException.get()) + throw new IOException("Test exception"); + return key; + } + }; + + try { + verifyImmediatelyConnectedException(selector, "0"); + throwIOException.set(true); + verifyImmediatelyConnectedException(selector, "1"); + } finally { + selector.close(); + metrics.close(); + } + } + + private void verifyImmediatelyConnectedException(Selector selector, String id) throws Exception { + try { + selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); + fail("Expected exception not thrown"); + } catch (Exception e) { + verifyEmptyImmediatelyConnectedKeys(selector); + assertNull("Channel not removed", selector.channel(id)); + ensureEmptySelectorFields(selector); + } + } + @Test public void testCloseOldestConnectionWithOneStagedReceive() throws Exception { verifyCloseOldestConnectionWithStagedReceives(1); @@ -715,6 +767,10 @@ public class SelectorTest { } selector.poll(0); selector.poll(0); // Poll a second time to clear everything + ensureEmptySelectorFields(selector); + } + + private void ensureEmptySelectorFields(Selector selector) throws Exception { for (Field field : Selector.class.getDeclaredFields()) { ensureEmptySelectorField(selector, field); }