This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46e8081 KAFKA-7712; Remove channel from Selector before propagating exception (#6023)
46e8081 is described below
commit 46e8081f9cae24f5edbfa2099e317a33e531d2fb
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Dec 12 18:50:56 2018 +0000
KAFKA-7712; Remove channel from Selector before propagating exception (#6023)
Ensure that channel and selection keys are removed from `Selector` collections before
propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't
ensure that callers (NetworkClient for example) wont try to connect again before the next
`poll` and hence we should clear the collections before re-throwing exceptions from `connect()`.
Reviewers: Jason Gustafson <jason@confluent.io>
---
.../org/apache/kafka/common/network/Selector.java | 8 ++-
.../apache/kafka/common/network/SelectorTest.java | 76 +++++++++++++++++++---
2 files changed, 72 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 843d46d..8c46746 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -250,10 +250,11 @@ public class Selector implements Selectable, AutoCloseable {
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
+ SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
boolean connected = doConnect(socketChannel, address);
- SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
+ key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
@@ -262,6 +263,9 @@ public class Selector implements Selectable, AutoCloseable {
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
+ if (key != null)
+ immediatelyConnectedKeys.remove(key);
+ channels.remove(id);
socketChannel.close();
throw e;
}
@@ -316,7 +320,7 @@ public class Selector implements Selectable, AutoCloseable {
throw new IllegalStateException("There is already a connection for id " + id
+ " that is still being closed");
}
- private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps)
throws IOException {
+ protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps)
throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
this.channels.put(id, channel);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 6cf7586..2da1cc6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -380,16 +381,7 @@ public class SelectorTest {
@Test
public void testImmediatelyConnectedCleaned() throws Exception {
Metrics metrics = new Metrics(); // new metrics object to avoid metric registration
conflicts
- Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder,
new LogContext()) {
- @Override
- protected boolean doConnect(SocketChannel channel, InetSocketAddress address)
throws IOException {
- // Use a blocking connect to trigger the immediately connected path
- channel.configureBlocking(true);
- boolean connected = super.doConnect(channel, address);
- channel.configureBlocking(false);
- return connected;
- }
- };
+ Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup",
channelBuilder, new LogContext());
try {
testImmediatelyConnectedCleaned(selector, true);
@@ -400,6 +392,26 @@ public class SelectorTest {
}
}
+ private static class ImmediatelyConnectingSelector extends Selector {
+ public ImmediatelyConnectingSelector(long connectionMaxIdleMS,
+ Metrics metrics,
+ Time time,
+ String metricGrpPrefix,
+ ChannelBuilder channelBuilder,
+ LogContext logContext) {
+ super(connectionMaxIdleMS, metrics, time, metricGrpPrefix, channelBuilder, logContext);
+ }
+
+ @Override
+ protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws
IOException {
+ // Use a blocking connect to trigger the immediately connected path
+ channel.configureBlocking(true);
+ boolean connected = super.doConnect(channel, address);
+ channel.configureBlocking(false);
+ return connected;
+ }
+ }
+
private void testImmediatelyConnectedCleaned(Selector selector, boolean closeAfterFirstPoll)
throws Exception {
String id = "0";
selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE,
BUFFER_SIZE);
@@ -412,6 +424,46 @@ public class SelectorTest {
verifySelectorEmpty(selector);
}
+ /**
+ * Verify that if Selector#connect fails and throws an Exception, all related objects
+ * are cleared immediately before the exception is propagated.
+ */
+ @Test
+ public void testConnectException() throws Exception {
+ Metrics metrics = new Metrics();
+ AtomicBoolean throwIOException = new AtomicBoolean();
+ Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup",
channelBuilder, new LogContext()) {
+ @Override
+ protected SelectionKey registerChannel(String id, SocketChannel socketChannel,
int interestedOps) throws IOException {
+ SelectionKey key = super.registerChannel(id, socketChannel, interestedOps);
+ key.cancel();
+ if (throwIOException.get())
+ throw new IOException("Test exception");
+ return key;
+ }
+ };
+
+ try {
+ verifyImmediatelyConnectedException(selector, "0");
+ throwIOException.set(true);
+ verifyImmediatelyConnectedException(selector, "1");
+ } finally {
+ selector.close();
+ metrics.close();
+ }
+ }
+
+ private void verifyImmediatelyConnectedException(Selector selector, String id) throws
Exception {
+ try {
+ selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE,
BUFFER_SIZE);
+ fail("Expected exception not thrown");
+ } catch (Exception e) {
+ verifyEmptyImmediatelyConnectedKeys(selector);
+ assertNull("Channel not removed", selector.channel(id));
+ ensureEmptySelectorFields(selector);
+ }
+ }
+
@Test
public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
verifyCloseOldestConnectionWithStagedReceives(1);
@@ -715,6 +767,10 @@ public class SelectorTest {
}
selector.poll(0);
selector.poll(0); // Poll a second time to clear everything
+ ensureEmptySelectorFields(selector);
+ }
+
+ private void ensureEmptySelectorFields(Selector selector) throws Exception {
for (Field field : Selector.class.getDeclaredFields()) {
ensureEmptySelectorField(selector, field);
}
|