From 9fa8686c7c8f429cea79fd350f03d1297201e81f Mon Sep 17 00:00:00 2001 From: CyB3RC0nN0R Date: Sun, 9 Feb 2020 15:09:00 +0100 Subject: [PATCH] Closing socket if MessageReader fails --- .../com/jenkov/nioserver/SocketProcessor.java | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/jenkov/nioserver/SocketProcessor.java b/src/main/java/com/jenkov/nioserver/SocketProcessor.java index 27575f2..92ad574 100644 --- a/src/main/java/com/jenkov/nioserver/SocketProcessor.java +++ b/src/main/java/com/jenkov/nioserver/SocketProcessor.java @@ -5,14 +5,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; +import java.util.*; /** * Project: java-nio-server
@@ -59,7 +52,7 @@ public class SocketProcessor implements Runnable { this.readMessageBuffer = readMessageBuffer; this.writeMessageBuffer = writeMessageBuffer; - writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); + writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue); this.messageReaderFactory = messageReaderFactory; @@ -69,6 +62,7 @@ public class SocketProcessor implements Runnable { writeSelector = Selector.open(); } + @Override public void run() { while (true) { try { @@ -112,7 +106,7 @@ public class SocketProcessor implements Runnable { int readReady = readSelector.selectNow(); if (readReady > 0) { - Set selectedKeys = this.readSelector.selectedKeys(); + Set selectedKeys = readSelector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { @@ -130,25 +124,34 @@ public class SocketProcessor implements Runnable { private void readFromSocket(SelectionKey key) throws IOException { Socket socket = (Socket) key.attachment(); - socket.messageReader.read(socket, this.readByteBuffer); + boolean cancelled = false; - List fullMessages = socket.messageReader.getMessages(); - if (fullMessages.size() > 0) { - for (Message message : fullMessages) { - message.socketId = socket.socketId; - messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an - // IMessageWriter for this socket. + try { + socket.messageReader.read(socket, readByteBuffer); + + List fullMessages = socket.messageReader.getMessages(); + if (fullMessages.size() > 0) { + for (Message message : fullMessages) { + message.socketId = socket.socketId; + // the message processor will eventually push outgoing messages into an + // IMessageWriter for this socket. + messageProcessor.process(message, writeProxy); + } + fullMessages.clear(); + } + } catch (IOException e) { + e.printStackTrace(); + System.err.println("An exception occurred while reading from a socket. Cancelling socket..."); + cancelled = true; + } finally { + if (cancelled || socket.endOfStreamReached) { + System.out.println("Socket closed: " + socket.socketId); + socketMap.remove(socket.socketId); + socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId)); + key.attach(null); + key.cancel(); + key.channel().close(); } - fullMessages.clear(); - } - - if (socket.endOfStreamReached) { - System.out.println("Socket closed: " + socket.socketId); - socketMap.remove(socket.socketId); - socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId)); - key.attach(null); - key.cancel(); - key.channel().close(); } } @@ -164,10 +167,10 @@ public class SocketProcessor implements Runnable { registerNonEmptySockets(); // Select from the Selector. - int writeReady = this.writeSelector.selectNow(); + int writeReady = writeSelector.selectNow(); if (writeReady > 0) { - Set selectionKeys = this.writeSelector.selectedKeys(); + Set selectionKeys = writeSelector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { @@ -175,9 +178,9 @@ public class SocketProcessor implements Runnable { Socket socket = (Socket) key.attachment(); - socket.messageWriter.write(socket, this.writeByteBuffer); + socket.messageWriter.write(socket, writeByteBuffer); - if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); } + if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); } keyIterator.remove(); } @@ -193,7 +196,7 @@ public class SocketProcessor implements Runnable { private void cancelEmptySockets() { for (Socket socket : nonEmptyToEmptySockets) { - SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); + SelectionKey key = socket.socketChannel.keyFor(writeSelector); key.cancel(); } nonEmptyToEmptySockets.clear();