kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-6300; SelectorTest may fail with ConcurrentModificationException
Date Wed, 06 Dec 2017 19:42:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 d4f006995 -> 305e7949c


KAFKA-6300; SelectorTest may fail with ConcurrentModificationException

Synchronization is added w.r.t. sockets ArrayList to avoid ConcurrentModificationException

Author: tedyu <yuzhihong@gmail.com>

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4299 from tedyu/trunk

(cherry picked from commit b6909339876ca1cbcce28043c4b42f2c7e48307b)
Signed-off-by: Rajini Sivaram <rajinisivaram@googlemail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/305e7949
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/305e7949
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/305e7949

Branch: refs/heads/1.0
Commit: 305e7949c5f7edc5c9639df50ee5bb797493c7e8
Parents: d4f0069
Author: tedyu <yuzhihong@gmail.com>
Authored: Wed Dec 6 19:43:29 2017 +0000
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Wed Dec 6 19:44:13 2017 +0000

----------------------------------------------------------------------
 .../apache/kafka/common/network/EchoServer.java | 69 +++++++++++---------
 1 file changed, 39 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/305e7949/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index aa7a15e..e986598 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -41,6 +41,7 @@ class EchoServer extends Thread {
     private final ServerSocket serverSocket;
     private final List<Thread> threads;
     private final List<Socket> sockets;
+    private volatile boolean closing = false;
     private final SslFactory sslFactory;
     private final AtomicBoolean renegotiate = new AtomicBoolean();
 
@@ -71,40 +72,45 @@ class EchoServer extends Thread {
     @Override
     public void run() {
         try {
-            while (true) {
+            while (!closing) {
                 final Socket socket = serverSocket.accept();
-                sockets.add(socket);
-                Thread thread = new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            DataInputStream input = new DataInputStream(socket.getInputStream());
-                            DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-                            while (socket.isConnected() && !socket.isClosed()) {
-                                int size = input.readInt();
-                                if (renegotiate.get()) {
-                                    renegotiate.set(false);
-                                    ((SSLSocket) socket).startHandshake();
-                                }
-                                byte[] bytes = new byte[size];
-                                input.readFully(bytes);
-                                output.writeInt(size);
-                                output.write(bytes);
-                                output.flush();
-                            }
-                        } catch (IOException e) {
-                            // ignore
-                        } finally {
+                synchronized (sockets) {
+                    if (closing) {
+                        break;
+                    }
+                    sockets.add(socket);
+                    Thread thread = new Thread() {
+                        @Override
+                        public void run() {
                             try {
-                                socket.close();
+                                DataInputStream input = new DataInputStream(socket.getInputStream());
+                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+                                while (socket.isConnected() && !socket.isClosed())
{
+                                    int size = input.readInt();
+                                    if (renegotiate.get()) {
+                                        renegotiate.set(false);
+                                        ((SSLSocket) socket).startHandshake();
+                                    }
+                                    byte[] bytes = new byte[size];
+                                    input.readFully(bytes);
+                                    output.writeInt(size);
+                                    output.write(bytes);
+                                    output.flush();
+                                }
                             } catch (IOException e) {
                                 // ignore
+                            } finally {
+                                try {
+                                    socket.close();
+                                } catch (IOException e) {
+                                    // ignore
+                                }
                             }
                         }
-                    }
-                };
-                thread.start();
-                threads.add(thread);
+                    };
+                    thread.start();
+                    threads.add(thread);
+                }
             }
         } catch (IOException e) {
             // ignore
@@ -112,11 +118,14 @@ class EchoServer extends Thread {
     }
 
     public void closeConnections() throws IOException {
-        for (Socket socket : sockets)
-            socket.close();
+        synchronized (sockets) {
+            for (Socket socket : sockets)
+                socket.close();
+        }
     }
 
     public void close() throws IOException, InterruptedException {
+        closing = true;
         this.serverSocket.close();
         closeConnections();
         for (Thread t : threads)


Mime
View raw message