kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2813; selector doesn't close socket connection on non-IOExceptions
Date Thu, 12 Nov 2015 06:18:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk df88d3be7 -> 3fd168d95


KAFKA-2813; selector doesn't close socket connection on non-IOExceptions

Patched Selector.poll() to close the connection on any exception.

Author: Jun Rao <junrao@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Gwen Shapira <cshapi@gmail.com>

Closes #501 from junrao/KAFKA-2813


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3fd168d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3fd168d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3fd168d9

Branch: refs/heads/trunk
Commit: 3fd168d9522d1ad25f5582fbe838cea15bdb525f
Parents: df88d3b
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Nov 11 22:18:19 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Nov 11 22:18:19 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3fd168d9/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 34de616..639a2be 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -241,7 +241,6 @@ public class Selector implements Selectable {
      * @throws IllegalArgumentException If `timeout` is negative
      * @throws IllegalStateException If a send is given for which we have no existing connection
or for which there is
      *         already an in-progress send
-     * @throws InvalidReceiveException If invalid data is received
      */
     @Override
     public void poll(long timeout) throws IOException {
@@ -284,16 +283,8 @@ public class Selector implements Selectable {
                     /* if channel is ready read from any connections that have readable data
*/
                     if (channel.ready() && key.isReadable() && !hasStagedReceive(channel))
{
                         NetworkReceive networkReceive;
-                        try {
-                            while ((networkReceive = channel.read()) != null) {
-                                addToStagedReceives(channel, networkReceive);
-                            }
-                        } catch (InvalidReceiveException e) {
-                            log.error("Invalid data received from " + channel.id() + " closing
connection", e);
-                            close(channel);
-                            this.disconnected.add(channel.id());
-                            throw e;
-                        }
+                        while ((networkReceive = channel.read()) != null)
+                            addToStagedReceives(channel, networkReceive);
                     }
 
                     /* if channel is ready write to any sockets that have space in their
buffer and for which we have data */
@@ -310,9 +301,12 @@ public class Selector implements Selectable {
                         close(channel);
                         this.disconnected.add(channel.id());
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     String desc = channel.socketDescription();
-                    log.debug("Connection with {} disconnected", desc, e);
+                    if (e instanceof IOException)
+                        log.debug("Connection with {} disconnected", desc, e);
+                    else
+                        log.warn("Unexpected error from {}; closing connection", desc, e);
                     close(channel);
                     this.disconnected.add(channel.id());
                 }


Mime
View raw message