Closing socket if MessageReader fails

This commit is contained in:
Kai S. K. Engelbart 2020-02-09 15:09:00 +01:00
parent f0b007aa75
commit 9fa8686c7c
1 changed files with 35 additions and 32 deletions

View File

@ -5,14 +5,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.*;
/**
* Project: <strong>java-nio-server</strong><br>
@ -59,7 +52,7 @@ public class SocketProcessor implements Runnable {
this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer;
writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
writeProxy = new WriteProxy(writeMessageBuffer, outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory;
@ -69,6 +62,7 @@ public class SocketProcessor implements Runnable {
writeSelector = Selector.open();
}
@Override
public void run() {
while (true) {
try {
@ -112,7 +106,7 @@ public class SocketProcessor implements Runnable {
int readReady = readSelector.selectNow();
if (readReady > 0) {
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
@ -130,25 +124,34 @@ public class SocketProcessor implements Runnable {
private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer);
boolean cancelled = false;
List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) {
for (Message message : fullMessages) {
message.socketId = socket.socketId;
messageProcessor.process(message, writeProxy); // the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
try {
socket.messageReader.read(socket, readByteBuffer);
List<Message> fullMessages = socket.messageReader.getMessages();
if (fullMessages.size() > 0) {
for (Message message : fullMessages) {
message.socketId = socket.socketId;
// the message processor will eventually push outgoing messages into an
// IMessageWriter for this socket.
messageProcessor.process(message, writeProxy);
}
fullMessages.clear();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("An exception occurred while reading from a socket. Cancelling socket...");
cancelled = true;
} finally {
if (cancelled || socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null);
key.cancel();
key.channel().close();
}
fullMessages.clear();
}
if (socket.endOfStreamReached) {
System.out.println("Socket closed: " + socket.socketId);
socketMap.remove(socket.socketId);
socketIdListeners.forEach(l -> l.socketCancelled(socket.socketId));
key.attach(null);
key.cancel();
key.channel().close();
}
}
@ -164,10 +167,10 @@ public class SocketProcessor implements Runnable {
registerNonEmptySockets();
// Select from the Selector.
int writeReady = this.writeSelector.selectNow();
int writeReady = writeSelector.selectNow();
if (writeReady > 0) {
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
@ -175,9 +178,9 @@ public class SocketProcessor implements Runnable {
Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer);
socket.messageWriter.write(socket, writeByteBuffer);
if (socket.messageWriter.isEmpty()) { this.nonEmptyToEmptySockets.add(socket); }
if (socket.messageWriter.isEmpty()) { nonEmptyToEmptySockets.add(socket); }
keyIterator.remove();
}
@ -193,7 +196,7 @@ public class SocketProcessor implements Runnable {
private void cancelEmptySockets() {
for (Socket socket : nonEmptyToEmptySockets) {
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
SelectionKey key = socket.socketChannel.keyFor(writeSelector);
key.cancel();
}
nonEmptyToEmptySockets.clear();