diff --git a/src/main/java/com/jenkov/nioserver/IMessageWriter.java b/src/main/java/com/jenkov/nioserver/IMessageWriter.java deleted file mode 100644 index 5fe57cc..0000000 --- a/src/main/java/com/jenkov/nioserver/IMessageWriter.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.jenkov.nioserver; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Selector; - -/** - * Created by jjenkov on 16-10-2015. - */ -public interface IMessageWriter { - - /** - * Called by IMessageProcessor - so no access to write Selector from here. - * - * @param message - */ - public void enqueue(Message message); - - public void write(Socket socket, ByteBuffer byteBuffer) throws IOException; - - public boolean isEmpty(); - -} diff --git a/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java b/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java deleted file mode 100644 index 255af93..0000000 --- a/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.jenkov.nioserver; - -import java.nio.channels.Selector; - -/** - * Created by jjenkov on 16-10-2015. - */ -public interface IMessageWriterFactory { - - public IMessageWriter createMessageWriter(); - -} diff --git a/src/main/java/com/jenkov/nioserver/MessageWriter.java b/src/main/java/com/jenkov/nioserver/MessageWriter.java index 49fdc15..4111b8a 100644 --- a/src/main/java/com/jenkov/nioserver/MessageWriter.java +++ b/src/main/java/com/jenkov/nioserver/MessageWriter.java @@ -8,7 +8,7 @@ import java.util.List; /** * Created by jjenkov on 21-10-2015. */ -public class MessageWriter implements IMessageWriter { +public class MessageWriter { private List writeQueue = new ArrayList<>(); private Message messageInProgress = null; @@ -17,25 +17,15 @@ public class MessageWriter implements IMessageWriter { public MessageWriter() { } - @Override public void enqueue(Message message) { if(this.messageInProgress == null){ this.messageInProgress = message; - System.out.println("Message set as message in progress."); } else { this.writeQueue.add(message); - System.out.println("Message enqueued."); } - - //todo register socket for write interest - } - - @Override public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { - System.out.println("Writing message to socket"); - byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten); byteBuffer.flip(); @@ -52,8 +42,8 @@ public class MessageWriter implements IMessageWriter { } } - @Override public boolean isEmpty() { return this.writeQueue.isEmpty() && this.messageInProgress == null; } + } diff --git a/src/main/java/com/jenkov/nioserver/ServerCore.java b/src/main/java/com/jenkov/nioserver/ServerCore.java index bb2fd91..268c883 100644 --- a/src/main/java/com/jenkov/nioserver/ServerCore.java +++ b/src/main/java/com/jenkov/nioserver/ServerCore.java @@ -12,15 +12,16 @@ import java.util.*; */ public class ServerCore implements Runnable { - private MessageBuffer readMessageBuffer = null; - private MessageBuffer writeMessageBuffer = null; - private Queue inboundSocketQueue = null; - private Queue outboundMessageQueue = new LinkedList<>(); //todo use a better / faster queue. + private Queue inboundSocketQueue = null; - private Map socketMap = new HashMap<>(); + private MessageBuffer readMessageBuffer = null; //todo Not used now - but perhaps will be later - to check for space in the buffer before reading from sockets + private MessageBuffer writeMessageBuffer = null; //todo Not used now - but perhaps will be later - to check for space in the buffer before reading from sockets (space for more to write?) private IMessageReaderFactory messageReaderFactory = null; - private IMessageWriterFactory messageWriterFactory = null; + + private Queue outboundMessageQueue = new LinkedList<>(); //todo use a better / faster queue. + + private Map socketMap = new HashMap<>(); private ByteBuffer readByteBuffer = ByteBuffer.allocate(1024 * 1024); private ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024 * 1024); @@ -36,7 +37,7 @@ public class ServerCore implements Runnable { private Set nonEmptyToEmptySockets = new HashSet<>(); - public ServerCore(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageWriterFactory messageWriterFactory, IMessageProcessor messageProcessor) throws IOException { + public ServerCore(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { this.inboundSocketQueue = inboundSocketQueue; this.readMessageBuffer = readMessageBuffer; @@ -44,7 +45,6 @@ public class ServerCore implements Runnable { this.writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); this.messageReaderFactory = messageReaderFactory; - this.messageWriterFactory = messageWriterFactory; this.messageProcessor = messageProcessor; @@ -84,7 +84,7 @@ public class ServerCore implements Runnable { newSocket.socketChannel.configureBlocking(false); newSocket.messageReader = this.messageReaderFactory.createMessageReader(); - newSocket.messageWriter = this.messageWriterFactory.createMessageWriter(); + newSocket.messageWriter = new MessageWriter(); this.socketMap.put(newSocket.socketId, newSocket); @@ -128,6 +128,7 @@ public class ServerCore implements Runnable { } if(socket.endOfStreamReached){ + System.out.println("Socket closed: " + socket.socketId); this.socketMap.remove(socket.socketId); key.attach(null); key.cancel(); @@ -147,9 +148,7 @@ public class ServerCore implements Runnable { // Register all sockets that *have* data and which are not yet registered. registerNonEmptySockets(); - // Select from the Selector. - int writeReady = this.writeSelector.selectNow(); if(writeReady > 0){ @@ -185,6 +184,7 @@ public class ServerCore implements Runnable { private void cancelEmptySockets() { for(Socket socket : nonEmptyToEmptySockets){ SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); + key.cancel(); } nonEmptyToEmptySockets.clear(); @@ -196,7 +196,7 @@ public class ServerCore implements Runnable { Socket socket = this.socketMap.get(outMessage.socketId); if(socket != null){ - IMessageWriter messageWriter = socket.messageWriter; + MessageWriter messageWriter = socket.messageWriter; if(messageWriter.isEmpty()){ messageWriter.enqueue(outMessage); nonEmptyToEmptySockets.remove(socket); diff --git a/src/main/java/com/jenkov/nioserver/Socket.java b/src/main/java/com/jenkov/nioserver/Socket.java index 00b084c..6d33943 100644 --- a/src/main/java/com/jenkov/nioserver/Socket.java +++ b/src/main/java/com/jenkov/nioserver/Socket.java @@ -11,9 +11,9 @@ public class Socket { public long socketId; - public SocketChannel socketChannel = null; + public SocketChannel socketChannel = null; public IMessageReader messageReader = null; - public IMessageWriter messageWriter = null; + public MessageWriter messageWriter = null; public boolean endOfStreamReached = false; diff --git a/src/main/java/com/jenkov/nioserver/SocketSwitch.java b/src/main/java/com/jenkov/nioserver/SocketSwitch.java deleted file mode 100644 index f34c699..0000000 --- a/src/main/java/com/jenkov/nioserver/SocketSwitch.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.jenkov.nioserver; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -/** - * - * Todo - rename class to SocketWriter ? - * - * Created by jjenkov on 20-10-2015. - */ -public class SocketSwitch { - - private Map socketMap = new HashMap<>(); - private Map messageWriterMap = new HashMap<>(); - - private Set socketsToCancel = new HashSet<>(); - private Set socketsToRegister = new HashSet<>(); - - - public Set getSocketsToCancel() { - return this.socketsToCancel; - } - - public Set getSocketsToRegister() { - return this.socketsToRegister; - } - - //todo change putMessageWriter to registerSocket(); - public void registerSocket(Socket socket){ - System.out.println("Registering socket: " + socket.socketId ); - this.socketMap.put(socket.socketId, socket); - this.messageWriterMap.put(socket.socketId, socket.messageWriter); - } - - public void enqueueMessage(long socketId, Message message){ - IMessageWriter messageWriter = this.messageWriterMap.get(socketId); - - - - if(messageWriter != null){ - boolean wasEmpty = messageWriter.isEmpty(); - messageWriter.enqueue(message); - - if(wasEmpty){ - Socket socket = this.socketMap.get(socketId); - - if(this.socketsToCancel.contains(socket)){ - //Socket already registered with Selector. Remove cancellation request but - // do not re-register socket with Selector - this.socketsToCancel.remove(socket); - } else { - //Socket not currently registered with Selector. Request registration. - this.socketsToRegister.add(socket); - } - } - } - } - - public void writeToSocket(Socket socket, ByteBuffer writeByteBuffer) throws IOException { - IMessageWriter messageWriter = messageWriterMap.get(socket.socketId); - if(messageWriter != null){ - messageWriter.write(socket, writeByteBuffer); - - if(messageWriter.isEmpty()) { - this.socketsToCancel.add(socket); - } - } - - } - - public void unregisterSocket(long socketId){ - System.out.println("Unregistering socket: " + socketId); - this.socketMap.remove(socketId); - - //todo in case the message writer should survive a "missing connection" - don't remove it. - this.messageWriterMap.remove(socketId); - } - - public void removeMessageWriter(long id){ - this.socketMap.remove(id); - } - - - -} diff --git a/src/main/java/com/jenkov/nioserver/example/Main.java b/src/main/java/com/jenkov/nioserver/example/Main.java index eaefa2c..8182fa6 100644 --- a/src/main/java/com/jenkov/nioserver/example/Main.java +++ b/src/main/java/com/jenkov/nioserver/example/Main.java @@ -2,7 +2,6 @@ package com.jenkov.nioserver.example; import com.jenkov.nioserver.*; import com.jenkov.nioserver.http.HttpMessageReaderFactory; -import com.jenkov.nioserver.http.HttpMessageWriterFactory; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; @@ -14,7 +13,6 @@ import java.util.concurrent.BlockingQueue; public class Main { public static void main(String[] args) throws IOException { - BlockingQueue inboundSocketQueue = new ArrayBlockingQueue(1024); Server server = new Server(9999, inboundSocketQueue); @@ -22,11 +20,9 @@ public class Main { Thread serverThread = new Thread(server); serverThread.start(); - MessageBuffer readMessageBuffer = new MessageBuffer(); MessageBuffer writeMessageBuffer = new MessageBuffer(); IMessageReaderFactory messageReaderFactory = new HttpMessageReaderFactory(readMessageBuffer); - IMessageWriterFactory messageWriterFactory = new HttpMessageWriterFactory(); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + @@ -46,7 +42,7 @@ public class Main { writeProxy.enqueue(response); }; - ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageWriterFactory, messageProcessor); + ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageProcessor); Thread serverCoreThread = new Thread(serverCore); serverCoreThread.start(); } diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java deleted file mode 100644 index 32df192..0000000 --- a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.jenkov.nioserver.http; - -import com.jenkov.nioserver.IMessageWriter; -import com.jenkov.nioserver.Message; -import com.jenkov.nioserver.SocketSwitch; -import com.jenkov.nioserver.Socket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -/** - * Created by jjenkov on 21-10-2015. - */ -public class HttpMessageWriter implements IMessageWriter { - - private List writeQueue = new ArrayList<>(); - private Message messageInProgress = null; - private int bytesWritten = 0; - - public HttpMessageWriter() { - } - - @Override - public void enqueue(Message message) { - if(this.messageInProgress == null){ - this.messageInProgress = message; - System.out.println("Message set as message in progress."); - } else { - this.writeQueue.add(message); - System.out.println("Message enqueued."); - } - - //todo register socket for write interest - - } - - - @Override - public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { - System.out.println("Writing message to socket"); - - byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten); - byteBuffer.flip(); - - this.bytesWritten += socket.write(byteBuffer); - byteBuffer.clear(); - - if(bytesWritten >= this.messageInProgress.length){ - if(this.writeQueue.size() > 0){ - this.messageInProgress = this.writeQueue.remove(0); - } else { - this.messageInProgress = null; - //todo unregister from selector - } - } - } - - @Override - public boolean isEmpty() { - return this.writeQueue.isEmpty() && this.messageInProgress == null; - } -} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java deleted file mode 100644 index f4e3966..0000000 --- a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.jenkov.nioserver.http; - -import com.jenkov.nioserver.IMessageWriter; -import com.jenkov.nioserver.IMessageWriterFactory; - -import java.nio.channels.Selector; - -/** - * Created by jjenkov on 19-10-2015. - */ -public class HttpMessageWriterFactory implements IMessageWriterFactory { - - @Override - public IMessageWriter createMessageWriter() { - return new HttpMessageWriter(); - //return null; - } -}