diff --git a/src/main/java/com/jenkov/nioserver/IMessageProcessor.java b/src/main/java/com/jenkov/nioserver/IMessageProcessor.java new file mode 100644 index 0000000..3d5e9b9 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/IMessageProcessor.java @@ -0,0 +1,10 @@ +package com.jenkov.nioserver; + +/** + * Created by jjenkov on 16-10-2015. + */ +public interface IMessageProcessor { + + public void process(Message message, WriteProxy writeProxy); + +} diff --git a/src/main/java/com/jenkov/nioserver/IMessageReader.java b/src/main/java/com/jenkov/nioserver/IMessageReader.java new file mode 100644 index 0000000..322b20c --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/IMessageReader.java @@ -0,0 +1,18 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Created by jjenkov on 16-10-2015. + */ +public interface IMessageReader { + + public void read(Socket socket, ByteBuffer byteBuffer) throws IOException; + + public List getMessages(); + + + +} diff --git a/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java b/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java new file mode 100644 index 0000000..0019f48 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java @@ -0,0 +1,9 @@ +package com.jenkov.nioserver; + +/** + * Created by jjenkov on 16-10-2015. + */ +public interface IMessageReaderFactory { + + public IMessageReader createMessageReader(); +} diff --git a/src/main/java/com/jenkov/nioserver/IMessageWriter.java b/src/main/java/com/jenkov/nioserver/IMessageWriter.java new file mode 100644 index 0000000..5fe57cc --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/IMessageWriter.java @@ -0,0 +1,23 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; + +/** + * Created by jjenkov on 16-10-2015. + */ +public interface IMessageWriter { + + /** + * Called by IMessageProcessor - so no access to write Selector from here. + * + * @param message + */ + public void enqueue(Message message); + + public void write(Socket socket, ByteBuffer byteBuffer) throws IOException; + + public boolean isEmpty(); + +} diff --git a/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java b/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java new file mode 100644 index 0000000..255af93 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/IMessageWriterFactory.java @@ -0,0 +1,12 @@ +package com.jenkov.nioserver; + +import java.nio.channels.Selector; + +/** + * Created by jjenkov on 16-10-2015. + */ +public interface IMessageWriterFactory { + + public IMessageWriter createMessageWriter(); + +} diff --git a/src/main/java/com/jenkov/nioserver/Message.java b/src/main/java/com/jenkov/nioserver/Message.java new file mode 100644 index 0000000..d11579a --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/Message.java @@ -0,0 +1,105 @@ +package com.jenkov.nioserver; + +import java.nio.ByteBuffer; + +/** + * Created by jjenkov on 16-10-2015. + */ +public class Message { + + private MessageBuffer messageBuffer = null; + + public long socketId = 0; // the id of source socket or destination socket, depending on whether is going in or out. + + public byte[] sharedArray = null; + public int offset = 0; //offset into sharedArray where this message data starts. + public int capacity = 0; //the size of the section in the sharedArray allocated to this message. + public int length = 0; //the number of bytes used of the allocated section. + + public Object metaData = null; + + public Message(MessageBuffer messageBuffer) { + this.messageBuffer = messageBuffer; + } + + /** + * Writes data from the ByteBuffer into this message - meaning into the buffer backing this message. + * + * @param byteBuffer The ByteBuffer containing the message data to write. + * @return + */ + public int writeToMessage(ByteBuffer byteBuffer){ + int remaining = byteBuffer.remaining(); + + while(this.length + remaining > capacity){ + if(!this.messageBuffer.expandMessage(this)) { + return -1; + } + } + + int bytesToCopy = Math.min(remaining, this.capacity - this.length); + byteBuffer.get(this.sharedArray, this.offset + this.length, bytesToCopy); + this.length += bytesToCopy; + + return bytesToCopy; + } + + + + + /** + * Writes data from the byte array into this message - meaning into the buffer backing this message. + * + * @param byteArray The byte array containing the message data to write. + * @return + */ + public int writeToMessage(byte[] byteArray){ + return writeToMessage(byteArray, 0, byteArray.length); + } + + + /** + * Writes data from the byte array into this message - meaning into the buffer backing this message. + * + * @param byteArray The byte array containing the message data to write. + * @return + */ + public int writeToMessage(byte[] byteArray, int offset, int length){ + int remaining = length; + + while(this.length + remaining > capacity){ + if(!this.messageBuffer.expandMessage(this)) { + return -1; + } + } + + int bytesToCopy = Math.min(remaining, this.capacity - this.length); + System.arraycopy(byteArray, offset, this.sharedArray, this.offset + this.length, bytesToCopy); + this.length += bytesToCopy; + return bytesToCopy; + } + + + + + /** + * In case the buffer backing the nextMessage contains more than one HTTP message, move all data after the first + * message to a new Message object. + * + * @param message The message containing the partial message (after the first message). + * @param endIndex The end index of the first message in the buffer of the message given as parameter. + */ + public void writePartialMessageToMessage(Message message, int endIndex){ + int startIndexOfPartialMessage = message.offset + endIndex; + int lengthOfPartialMessage = (message.offset + message.length) - endIndex; + + System.arraycopy(message.sharedArray, startIndexOfPartialMessage, this.sharedArray, this.offset, lengthOfPartialMessage); + } + + public int writeToByteBuffer(ByteBuffer byteBuffer){ + return 0; + } + + + +} diff --git a/src/main/java/com/jenkov/nioserver/MessageBuffer.java b/src/main/java/com/jenkov/nioserver/MessageBuffer.java new file mode 100644 index 0000000..bf78a54 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/MessageBuffer.java @@ -0,0 +1,88 @@ +package com.jenkov.nioserver; + +/** + * A shared buffer which can contain many messages inside. A message gets a section of the buffer to use. If the + * message outgrows the section in size, the message requests a larger section and the message is copied to that + * larger section. The smaller section is then freed again. + * + * + * Created by jjenkov on 18-10-2015. + */ +public class MessageBuffer { + + public static int KB = 1024; + public static int MB = 1024 * KB; + + private static final int CAPACITY_SMALL = 4 * KB; + private static final int CAPACITY_MEDIUM = 128 * KB; + private static final int CAPACITY_LARGE = 1024 * KB; + + //package scope (default) - so they can be accessed from unit tests. + byte[] smallMessageBuffer = new byte[1024 * 4 * KB]; //1024 x 4KB messages = 4MB. + byte[] mediumMessageBuffer = new byte[128 * 128 * KB]; // 128 x 128KB messages = 16MB. + byte[] largeMessageBuffer = new byte[16 * 1 * MB]; // 16 * 1MB messages = 16MB. + + QueueIntFlip smallMessageBufferFreeBlocks = new QueueIntFlip(1024); // 1024 free sections + QueueIntFlip mediumMessageBufferFreeBlocks = new QueueIntFlip(128); // 128 free sections + QueueIntFlip largeMessageBufferFreeBlocks = new QueueIntFlip(16); // 16 free sections + + //todo make all message buffer capacities and block sizes configurable + //todo calculate free block queue sizes based on capacity and block size of buffers. + + public MessageBuffer() { + //add all free sections to all free section queues. + for(int i=0; i writeQueue = new ArrayList<>(); + private Message messageInProgress = null; + private int bytesWritten = 0; + + public MessageWriter() { + } + + @Override + public void enqueue(Message message) { + if(this.messageInProgress == null){ + this.messageInProgress = message; + System.out.println("Message set as message in progress."); + } else { + this.writeQueue.add(message); + System.out.println("Message enqueued."); + } + + //todo register socket for write interest + + } + + + @Override + public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { + System.out.println("Writing message to socket"); + + byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten); + byteBuffer.flip(); + + this.bytesWritten += socket.write(byteBuffer); + byteBuffer.clear(); + + if(bytesWritten >= this.messageInProgress.length){ + if(this.writeQueue.size() > 0){ + this.messageInProgress = this.writeQueue.remove(0); + } else { + this.messageInProgress = null; + //todo unregister from selector + } + } + } + + @Override + public boolean isEmpty() { + return this.writeQueue.isEmpty() && this.messageInProgress == null; + } +} diff --git a/src/main/java/com/jenkov/nioserver/QueueIntFlip.java b/src/main/java/com/jenkov/nioserver/QueueIntFlip.java new file mode 100644 index 0000000..ca1efd7 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/QueueIntFlip.java @@ -0,0 +1,186 @@ +package com.jenkov.nioserver; + +/** + * Same as QueueFillCount, except that QueueFlip uses a flip flag to keep track of when the internal writePos has + * "overflowed" (meaning it goes back to 0). Other than that, the two implementations are very similar in functionality. + * + * One additional difference is that QueueFlip has an available() method, where this is a public variable in + * QueueFillCount. + * + * Created by jjenkov on 18-09-2015. + */ +public class QueueIntFlip { + + public int[] elements = null; + + public int capacity = 0; + public int writePos = 0; + public int readPos = 0; + public boolean flipped = false; + + public QueueIntFlip(int capacity) { + this.capacity = capacity; + this.elements = new int[capacity]; //todo get from TypeAllocator ? + } + + public void reset() { + this.writePos = 0; + this.readPos = 0; + this.flipped = false; + } + + public int available() { + if(!flipped){ + return writePos - readPos; + } + return capacity - readPos + writePos; + } + + public int remainingCapacity() { + if(!flipped){ + return capacity - writePos; + } + return readPos - writePos; + } + + public boolean put(int element){ + if(!flipped){ + if(writePos == capacity){ + writePos = 0; + flipped = true; + + if(writePos < readPos){ + elements[writePos++] = element; + return true; + } else { + return false; + } + } else { + elements[writePos++] = element; + return true; + } + } else { + if(writePos < readPos ){ + elements[writePos++] = element; + return true; + } else { + return false; + } + } + } + + public int put(int[] newElements, int length){ + int newElementsReadPos = 0; + if(!flipped){ + //readPos lower than writePos - free sections are: + //1) from writePos to capacity + //2) from 0 to readPos + + if(length <= capacity - writePos){ + //new elements fit into top of elements array - copy directly + for(; newElementsReadPos < length; newElementsReadPos++){ + this.elements[this.writePos++] = newElements[newElementsReadPos]; + } + + return newElementsReadPos; + } else { + //new elements must be divided between top and bottom of elements array + + //writing to top + for(;this.writePos < capacity; this.writePos++){ + this.elements[this.writePos] = newElements[newElementsReadPos++]; + } + + //writing to bottom + this.writePos = 0; + this.flipped = true; + int endPos = Math.min(this.readPos, length - newElementsReadPos); + for(; this.writePos < endPos; this.writePos++){ + this.elements[writePos] = newElements[newElementsReadPos++]; + } + + + return newElementsReadPos; + } + + } else { + //readPos higher than writePos - free sections are: + //1) from writePos to readPos + + int endPos = Math.min(this.readPos, this.writePos + length); + + for(; this.writePos < endPos; this.writePos++){ + this.elements[this.writePos] = newElements[newElementsReadPos++]; + } + + return newElementsReadPos; + } + } + + + public int take() { + if(!flipped){ + if(readPos < writePos){ + return elements[readPos++]; + } else { + return -1; + } + } else { + if(readPos == capacity){ + readPos = 0; + flipped = false; + + if(readPos < writePos){ + return elements[readPos++]; + } else { + return -1; + } + } else { + return elements[readPos++]; + } + } + } + + public int take(int[] into, int length){ + int intoWritePos = 0; + if(!flipped){ + //writePos higher than readPos - available section is writePos - readPos + + int endPos = Math.min(this.writePos, this.readPos + length); + for(; this.readPos < endPos; this.readPos++){ + into[intoWritePos++] = this.elements[this.readPos]; + } + return intoWritePos; + } else { + //readPos higher than writePos - available sections are top + bottom of elements array + + if(length <= capacity - readPos){ + //length is lower than the elements available at the top of the elements array - copy directly + for(; intoWritePos < length; intoWritePos++){ + into[intoWritePos] = this.elements[this.readPos++]; + } + + return intoWritePos; + } else { + //length is higher than elements available at the top of the elements array + //split copy into a copy from both top and bottom of elements array. + + //copy from top + for(; this.readPos < capacity; this.readPos++){ + into[intoWritePos++] = this.elements[this.readPos]; + } + + //copy from bottom + this.readPos = 0; + this.flipped = false; + int endPos = Math.min(this.writePos, length - intoWritePos); + for(; this.readPos < endPos; this.readPos++){ + into[intoWritePos++] = this.elements[this.readPos]; + } + + return intoWritePos; + } + } + } + +} diff --git a/src/main/java/com/jenkov/nioserver/Server.java b/src/main/java/com/jenkov/nioserver/Server.java new file mode 100644 index 0000000..f41ef80 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/Server.java @@ -0,0 +1,54 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Queue; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class Server implements Runnable{ + + private int tcpPort = 0; + private ServerSocketChannel serverSocket = null; + + private Queue socketQueue = null; + + public Server(int tcpPort, Queue socketQueue) { + this.tcpPort = tcpPort; + this.socketQueue = socketQueue; + } + + + + public void run() { + try{ + this.serverSocket = ServerSocketChannel.open(); + this.serverSocket.bind(new InetSocketAddress(tcpPort)); + } catch(IOException e){ + e.printStackTrace(); + return; + } + + + while(true){ + try{ + SocketChannel socketChannel = this.serverSocket.accept(); + + System.out.println("Socket accepted"); + + //todo check if the queue can even accept more sockets. + this.socketQueue.add(new Socket(socketChannel)); + + + } catch(IOException e){ + e.printStackTrace(); + } + + } + + } +} diff --git a/src/main/java/com/jenkov/nioserver/ServerCore.java b/src/main/java/com/jenkov/nioserver/ServerCore.java new file mode 100644 index 0000000..bb2fd91 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/ServerCore.java @@ -0,0 +1,213 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.*; + +/** + * Created by jjenkov on 16-10-2015. + */ +public class ServerCore implements Runnable { + + private MessageBuffer readMessageBuffer = null; + private MessageBuffer writeMessageBuffer = null; + private Queue inboundSocketQueue = null; + private Queue outboundMessageQueue = new LinkedList<>(); //todo use a better / faster queue. + + private Map socketMap = new HashMap<>(); + + private IMessageReaderFactory messageReaderFactory = null; + private IMessageWriterFactory messageWriterFactory = null; + + private ByteBuffer readByteBuffer = ByteBuffer.allocate(1024 * 1024); + private ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024 * 1024); + private Selector readSelector = null; + private Selector writeSelector = null; + + private IMessageProcessor messageProcessor = null; + private WriteProxy writeProxy = null; + + private long nextSocketId = 16 * 1024; //start incoming socket ids from 16K - reserve bottom ids for pre-defined sockets (servers). + + private Set emptyToNonEmptySockets = new HashSet<>(); + private Set nonEmptyToEmptySockets = new HashSet<>(); + + + public ServerCore(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageWriterFactory messageWriterFactory, IMessageProcessor messageProcessor) throws IOException { + this.inboundSocketQueue = inboundSocketQueue; + + this.readMessageBuffer = readMessageBuffer; + this.writeMessageBuffer = writeMessageBuffer; + this.writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue); + + this.messageReaderFactory = messageReaderFactory; + this.messageWriterFactory = messageWriterFactory; + + this.messageProcessor = messageProcessor; + + this.readSelector = Selector.open(); + this.writeSelector = Selector.open(); + } + + public void run() { + while(true){ + try{ + executeCycle(); + } catch(IOException e){ + e.printStackTrace(); + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + + public void executeCycle() throws IOException { + takeNewSockets(); + readFromSockets(); + writeToSockets(); + } + + + public void takeNewSockets() throws IOException { + Socket newSocket = this.inboundSocketQueue.poll(); + + while(newSocket != null){ + newSocket.socketId = this.nextSocketId++; + newSocket.socketChannel.configureBlocking(false); + + newSocket.messageReader = this.messageReaderFactory.createMessageReader(); + newSocket.messageWriter = this.messageWriterFactory.createMessageWriter(); + + this.socketMap.put(newSocket.socketId, newSocket); + + SelectionKey key = newSocket.socketChannel.register(this.readSelector, SelectionKey.OP_READ); + key.attach(newSocket); + + newSocket = this.inboundSocketQueue.poll(); + } + } + + + public void readFromSockets() throws IOException { + int readReady = this.readSelector.selectNow(); + + if(readReady > 0){ + Set selectedKeys = this.readSelector.selectedKeys(); + Iterator keyIterator = selectedKeys.iterator(); + + while(keyIterator.hasNext()) { + SelectionKey key = keyIterator.next(); + + readFromSocket(key); + + keyIterator.remove(); + } + selectedKeys.clear(); + } + } + + private void readFromSocket(SelectionKey key) throws IOException { + Socket socket = (Socket) key.attachment(); + socket.messageReader.read(socket, this.readByteBuffer); + + List fullMessages = socket.messageReader.getMessages(); + if(fullMessages.size() > 0){ + for(Message message : fullMessages){ + message.socketId = socket.socketId; + this.messageProcessor.process(message, this.writeProxy); //the message processor will eventually push outgoing messages into an IMessageWriter for this socket. + } + fullMessages.clear(); + } + + if(socket.endOfStreamReached){ + this.socketMap.remove(socket.socketId); + key.attach(null); + key.cancel(); + key.channel().close(); + } + } + + + public void writeToSockets() throws IOException { + + // Take all new messages from outboundMessageQueue + takeNewOutboundMessages(); + + // Cancel all sockets which have no more data to write. + cancelEmptySockets(); + + // Register all sockets that *have* data and which are not yet registered. + registerNonEmptySockets(); + + + // Select from the Selector. + + int writeReady = this.writeSelector.selectNow(); + + if(writeReady > 0){ + Set selectionKeys = this.writeSelector.selectedKeys(); + Iterator keyIterator = selectionKeys.iterator(); + + while(keyIterator.hasNext()){ + SelectionKey key = keyIterator.next(); + + Socket socket = (Socket) key.attachment(); + + socket.messageWriter.write(socket, this.writeByteBuffer); + + if(socket.messageWriter.isEmpty()){ + this.nonEmptyToEmptySockets.add(socket); + } + + keyIterator.remove(); + } + + selectionKeys.clear(); + + } + } + + private void registerNonEmptySockets() throws ClosedChannelException { + for(Socket socket : emptyToNonEmptySockets){ + socket.socketChannel.register(this.writeSelector, SelectionKey.OP_WRITE, socket); + } + emptyToNonEmptySockets.clear(); + } + + private void cancelEmptySockets() { + for(Socket socket : nonEmptyToEmptySockets){ + SelectionKey key = socket.socketChannel.keyFor(this.writeSelector); + key.cancel(); + } + nonEmptyToEmptySockets.clear(); + } + + private void takeNewOutboundMessages() { + Message outMessage = this.outboundMessageQueue.poll(); + while(outMessage != null){ + Socket socket = this.socketMap.get(outMessage.socketId); + + if(socket != null){ + IMessageWriter messageWriter = socket.messageWriter; + if(messageWriter.isEmpty()){ + messageWriter.enqueue(outMessage); + nonEmptyToEmptySockets.remove(socket); + emptyToNonEmptySockets.add(socket); //not necessary if removed from nonEmptyToEmptySockets in prev. statement. + } else{ + messageWriter.enqueue(outMessage); + } + } + + outMessage = this.outboundMessageQueue.poll(); + } + } + +} diff --git a/src/main/java/com/jenkov/nioserver/Socket.java b/src/main/java/com/jenkov/nioserver/Socket.java new file mode 100644 index 0000000..00b084c --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/Socket.java @@ -0,0 +1,57 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +/** + * Created by jjenkov on 16-10-2015. + */ +public class Socket { + + public long socketId; + + public SocketChannel socketChannel = null; + public IMessageReader messageReader = null; + public IMessageWriter messageWriter = null; + + public boolean endOfStreamReached = false; + + + public Socket() { + } + + public Socket(SocketChannel socketChannel) { + this.socketChannel = socketChannel; + } + + public int read(ByteBuffer byteBuffer) throws IOException { + int bytesRead = this.socketChannel.read(byteBuffer); + int totalBytesRead = bytesRead; + + while(bytesRead > 0){ + bytesRead = this.socketChannel.read(byteBuffer); + totalBytesRead += bytesRead; + } + if(bytesRead == -1){ + this.endOfStreamReached = true; + } + + return totalBytesRead; + } + + + public int write(ByteBuffer byteBuffer) throws IOException{ + int bytesWritten = this.socketChannel.write(byteBuffer); + int totalBytesWritten = bytesWritten; + + while(bytesWritten > 0 && byteBuffer.hasRemaining()){ + bytesWritten = this.socketChannel.write(byteBuffer); + totalBytesWritten += bytesWritten; + } + + return totalBytesWritten; + } + + +} diff --git a/src/main/java/com/jenkov/nioserver/SocketSwitch.java b/src/main/java/com/jenkov/nioserver/SocketSwitch.java new file mode 100644 index 0000000..f34c699 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/SocketSwitch.java @@ -0,0 +1,87 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * + * Todo - rename class to SocketWriter ? + * + * Created by jjenkov on 20-10-2015. + */ +public class SocketSwitch { + + private Map socketMap = new HashMap<>(); + private Map messageWriterMap = new HashMap<>(); + + private Set socketsToCancel = new HashSet<>(); + private Set socketsToRegister = new HashSet<>(); + + + public Set getSocketsToCancel() { + return this.socketsToCancel; + } + + public Set getSocketsToRegister() { + return this.socketsToRegister; + } + + //todo change putMessageWriter to registerSocket(); + public void registerSocket(Socket socket){ + System.out.println("Registering socket: " + socket.socketId ); + this.socketMap.put(socket.socketId, socket); + this.messageWriterMap.put(socket.socketId, socket.messageWriter); + } + + public void enqueueMessage(long socketId, Message message){ + IMessageWriter messageWriter = this.messageWriterMap.get(socketId); + + + + if(messageWriter != null){ + boolean wasEmpty = messageWriter.isEmpty(); + messageWriter.enqueue(message); + + if(wasEmpty){ + Socket socket = this.socketMap.get(socketId); + + if(this.socketsToCancel.contains(socket)){ + //Socket already registered with Selector. Remove cancellation request but + // do not re-register socket with Selector + this.socketsToCancel.remove(socket); + } else { + //Socket not currently registered with Selector. Request registration. + this.socketsToRegister.add(socket); + } + } + } + } + + public void writeToSocket(Socket socket, ByteBuffer writeByteBuffer) throws IOException { + IMessageWriter messageWriter = messageWriterMap.get(socket.socketId); + if(messageWriter != null){ + messageWriter.write(socket, writeByteBuffer); + + if(messageWriter.isEmpty()) { + this.socketsToCancel.add(socket); + } + } + + } + + public void unregisterSocket(long socketId){ + System.out.println("Unregistering socket: " + socketId); + this.socketMap.remove(socketId); + + //todo in case the message writer should survive a "missing connection" - don't remove it. + this.messageWriterMap.remove(socketId); + } + + public void removeMessageWriter(long id){ + this.socketMap.remove(id); + } + + + +} diff --git a/src/main/java/com/jenkov/nioserver/WriteProxy.java b/src/main/java/com/jenkov/nioserver/WriteProxy.java new file mode 100644 index 0000000..2c99529 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/WriteProxy.java @@ -0,0 +1,26 @@ +package com.jenkov.nioserver; + +import java.util.Queue; + +/** + * Created by jjenkov on 22-10-2015. + */ +public class WriteProxy { + + private MessageBuffer messageBuffer = null; + private Queue writeQueue = null; + + public WriteProxy(MessageBuffer messageBuffer, Queue writeQueue) { + this.messageBuffer = messageBuffer; + this.writeQueue = writeQueue; + } + + public Message getMessage(){ + return this.messageBuffer.getMessage(); + } + + public boolean enqueue(Message message){ + return this.writeQueue.offer(message); + } + +} diff --git a/src/main/java/com/jenkov/nioserver/example/Main.java b/src/main/java/com/jenkov/nioserver/example/Main.java new file mode 100644 index 0000000..eaefa2c --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/example/Main.java @@ -0,0 +1,55 @@ +package com.jenkov.nioserver.example; + +import com.jenkov.nioserver.*; +import com.jenkov.nioserver.http.HttpMessageReaderFactory; +import com.jenkov.nioserver.http.HttpMessageWriterFactory; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class Main { + + public static void main(String[] args) throws IOException { + + BlockingQueue inboundSocketQueue = new ArrayBlockingQueue(1024); + + Server server = new Server(9999, inboundSocketQueue); + + Thread serverThread = new Thread(server); + serverThread.start(); + + + MessageBuffer readMessageBuffer = new MessageBuffer(); + MessageBuffer writeMessageBuffer = new MessageBuffer(); + IMessageReaderFactory messageReaderFactory = new HttpMessageReaderFactory(readMessageBuffer); + IMessageWriterFactory messageWriterFactory = new HttpMessageWriterFactory(); + + String httpResponse = "HTTP/1.1 200 OK\r\n" + + "Content-Length: 38\r\n" + + "Content-Type: text/html\r\n" + + "\r\n" + + "Hello World!"; + + byte[] httpResponseBytes = httpResponse.getBytes("UTF-8"); + + IMessageProcessor messageProcessor = (request, writeProxy) -> { + System.out.println("Message Received from socket: " + request.socketId); + + Message response = writeProxy.getMessage(); + response.socketId = request.socketId; + response.writeToMessage(httpResponseBytes); + + writeProxy.enqueue(response); + }; + + ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageWriterFactory, messageProcessor); + Thread serverCoreThread = new Thread(serverCore); + serverCoreThread.start(); + } + + +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpHeaders.java b/src/main/java/com/jenkov/nioserver/http/HttpHeaders.java new file mode 100644 index 0000000..25e5a1b --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpHeaders.java @@ -0,0 +1,27 @@ +package com.jenkov.nioserver.http; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class HttpHeaders { + + public static int HTTP_METHOD_GET = 1; + public static int HTTP_METHOD_POST = 2; + public static int HTTP_METHOD_PUT = 3; + public static int HTTP_METHOD_HEAD = 4; + public static int HTTP_METHOD_DELETE = 5; + + public int httpMethod = 0; + + public int hostStartIndex = 0; + public int hostEndIndex = 0; + + public int contentLength = 0; + + public int bodyStartIndex = 0; + public int bodyEndIndex = 0; + + + + +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java new file mode 100644 index 0000000..41ce706 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java @@ -0,0 +1,60 @@ +package com.jenkov.nioserver.http; + +import com.jenkov.nioserver.IMessageReader; +import com.jenkov.nioserver.Message; +import com.jenkov.nioserver.MessageBuffer; +import com.jenkov.nioserver.Socket; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by jjenkov on 18-10-2015. + */ +public class HttpMessageReader implements IMessageReader { + + private MessageBuffer messageBuffer = null; + + private List completeMessages = new ArrayList(); + private Message nextMessage = null; + + public HttpMessageReader(MessageBuffer messageBuffer) { + this.messageBuffer = messageBuffer; + this.nextMessage = messageBuffer.getMessage(); + this.nextMessage.metaData = new HttpHeaders(); + } + + @Override + public void read(Socket socket, ByteBuffer byteBuffer) throws IOException { + int bytesRead = socket.read(byteBuffer); + byteBuffer.flip(); + + if(byteBuffer.remaining() == 0){ + byteBuffer.clear(); + return; + } + + this.nextMessage.writeToMessage(byteBuffer); + + int endIndex = HttpUtil.parseHttpRequest(this.nextMessage.sharedArray, this.nextMessage.offset, this.nextMessage.offset + this.nextMessage.length, (HttpHeaders) this.nextMessage.metaData); + if(endIndex != -1){ + Message message = this.messageBuffer.getMessage(); + message.metaData = new HttpHeaders(); + + message.writePartialMessageToMessage(nextMessage, endIndex); + + completeMessages.add(nextMessage); + nextMessage = message; + } + byteBuffer.clear(); + } + + + @Override + public List getMessages() { + return this.completeMessages; + } + +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java new file mode 100644 index 0000000..e2fe3da --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java @@ -0,0 +1,22 @@ +package com.jenkov.nioserver.http; + +import com.jenkov.nioserver.IMessageReader; +import com.jenkov.nioserver.IMessageReaderFactory; +import com.jenkov.nioserver.MessageBuffer; + +/** + * Created by jjenkov on 18-10-2015. + */ +public class HttpMessageReaderFactory implements IMessageReaderFactory { + + private MessageBuffer messageBuffer = null; + + public HttpMessageReaderFactory(MessageBuffer messageBuffer) { + this.messageBuffer = messageBuffer; + } + + @Override + public IMessageReader createMessageReader() { + return new HttpMessageReader(this.messageBuffer); + } +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java new file mode 100644 index 0000000..32df192 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriter.java @@ -0,0 +1,64 @@ +package com.jenkov.nioserver.http; + +import com.jenkov.nioserver.IMessageWriter; +import com.jenkov.nioserver.Message; +import com.jenkov.nioserver.SocketSwitch; +import com.jenkov.nioserver.Socket; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by jjenkov on 21-10-2015. + */ +public class HttpMessageWriter implements IMessageWriter { + + private List writeQueue = new ArrayList<>(); + private Message messageInProgress = null; + private int bytesWritten = 0; + + public HttpMessageWriter() { + } + + @Override + public void enqueue(Message message) { + if(this.messageInProgress == null){ + this.messageInProgress = message; + System.out.println("Message set as message in progress."); + } else { + this.writeQueue.add(message); + System.out.println("Message enqueued."); + } + + //todo register socket for write interest + + } + + + @Override + public void write(Socket socket, ByteBuffer byteBuffer) throws IOException { + System.out.println("Writing message to socket"); + + byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten); + byteBuffer.flip(); + + this.bytesWritten += socket.write(byteBuffer); + byteBuffer.clear(); + + if(bytesWritten >= this.messageInProgress.length){ + if(this.writeQueue.size() > 0){ + this.messageInProgress = this.writeQueue.remove(0); + } else { + this.messageInProgress = null; + //todo unregister from selector + } + } + } + + @Override + public boolean isEmpty() { + return this.writeQueue.isEmpty() && this.messageInProgress == null; + } +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java new file mode 100644 index 0000000..f4e3966 --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageWriterFactory.java @@ -0,0 +1,18 @@ +package com.jenkov.nioserver.http; + +import com.jenkov.nioserver.IMessageWriter; +import com.jenkov.nioserver.IMessageWriterFactory; + +import java.nio.channels.Selector; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class HttpMessageWriterFactory implements IMessageWriterFactory { + + @Override + public IMessageWriter createMessageWriter() { + return new HttpMessageWriter(); + //return null; + } +} diff --git a/src/main/java/com/jenkov/nioserver/http/HttpUtil.java b/src/main/java/com/jenkov/nioserver/http/HttpUtil.java new file mode 100644 index 0000000..ed2904d --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/http/HttpUtil.java @@ -0,0 +1,155 @@ +package com.jenkov.nioserver.http; + +import java.io.UnsupportedEncodingException; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class HttpUtil { + + private static final byte[] GET = new byte[]{'G','E','T'}; + private static final byte[] POST = new byte[]{'P','O','S','T'}; + private static final byte[] PUT = new byte[]{'P','U','T'}; + private static final byte[] HEAD = new byte[]{'H','E','A','D'}; + private static final byte[] DELETE = new byte[]{'D','E','L','E','T','E'}; + + private static final byte[] HOST = new byte[]{'H','o','s','t'}; + private static final byte[] CONTENT_LENGTH = new byte[]{'C','o','n','t','e','n','t','-','L','e','n','g','t','h'}; + + public static int parseHttpRequest(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders){ + + + /* + int endOfHttpMethod = findNext(src, startIndex, endIndex, (byte) ' '); + if(endOfHttpMethod == -1) return false; + resolveHttpMethod(src, startIndex, httpHeaders); + */ + + //parse HTTP request line + int endOfFirstLine = findNextLineBreak(src, startIndex, endIndex); + if(endOfFirstLine == -1) return -1; + + + //parse HTTP headers + int prevEndOfHeader = endOfFirstLine + 1; + int endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex); + + while(endOfHeader != -1 && endOfHeader != prevEndOfHeader + 1){ //prevEndOfHeader + 1 = end of previous header + 2 (+2 = CR + LF) + + if(matches(src, prevEndOfHeader, CONTENT_LENGTH)){ + try { + findContentLength(src, prevEndOfHeader, endIndex, httpHeaders); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + prevEndOfHeader = endOfHeader + 1; + endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex); + } + + if(endOfHeader == -1){ + return -1; + } + + //check that byte array contains full HTTP message. + int bodyStartIndex = endOfHeader + 1; + int bodyEndIndex = bodyStartIndex + httpHeaders.contentLength; + + if(bodyEndIndex <= endIndex){ + //byte array contains a full HTTP request + httpHeaders.bodyStartIndex = bodyStartIndex; + httpHeaders.bodyEndIndex = bodyEndIndex; + return bodyEndIndex; + } + + + return -1; + } + + private static void findContentLength(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders) throws UnsupportedEncodingException { + int indexOfColon = findNext(src, startIndex, endIndex, (byte) ':'); + + //skip spaces after colon + int index = indexOfColon +1; + while(src[index] == ' '){ + index++; + } + + int valueStartIndex = index; + int valueEndIndex = index; + boolean endOfValueFound = false; + + while(index < endIndex && !endOfValueFound){ + switch(src[index]){ + case '0' : ; + case '1' : ; + case '2' : ; + case '3' : ; + case '4' : ; + case '5' : ; + case '6' : ; + case '7' : ; + case '8' : ; + case '9' : { index++; break; } + + default: { + endOfValueFound = true; + valueEndIndex = index; + } + } + } + + httpHeaders.contentLength = Integer.parseInt(new String(src, valueStartIndex, valueEndIndex - valueStartIndex, "UTF-8")); + + } + + + public static int findNext(byte[] src, int startIndex, int endIndex, byte value){ + for(int index = startIndex; index < endIndex; index++){ + if(src[index] == value) return index; + } + return -1; + } + + public static int findNextLineBreak(byte[] src, int startIndex, int endIndex) { + for(int index = startIndex; index < endIndex; index++){ + if(src[index] == '\n'){ + if(src[index - 1] == '\r'){ + return index; + } + }; + } + return -1; + } + + public static void resolveHttpMethod(byte[] src, int startIndex, HttpHeaders httpHeaders){ + if(matches(src, startIndex, GET)) { + httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_GET; + return; + } + if(matches(src, startIndex, POST)){ + httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_POST; + return; + } + if(matches(src, startIndex, PUT)){ + httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_PUT; + return; + } + if(matches(src, startIndex, HEAD)){ + httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_HEAD; + return; + } + if(matches(src, startIndex, DELETE)){ + httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_DELETE; + return; + } + } + + public static boolean matches(byte[] src, int offset, byte[] value){ + for(int i=offset, n=0; n < value.length; i++, n++){ + if(src[i] != value[n]) return false; + } + return true; + } +} diff --git a/src/test/java/com.jenkov.nioserver/MessageBufferTest.java b/src/test/java/com.jenkov.nioserver/MessageBufferTest.java new file mode 100644 index 0000000..b1e1cda --- /dev/null +++ b/src/test/java/com.jenkov.nioserver/MessageBufferTest.java @@ -0,0 +1,80 @@ +package com.jenkov.nioserver; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; + +/** + * Created by jjenkov on 18-10-2015. + */ +public class MessageBufferTest { + + @Test + public void testGetMessage() { + + MessageBuffer messageBuffer = new MessageBuffer(); + + Message message = messageBuffer.getMessage(); + + assertNotNull(message); + assertEquals(0 , message.offset); + assertEquals(0 , message.length); + assertEquals(4 * 1024, message.capacity); + + Message message2 = messageBuffer.getMessage(); + + assertNotNull(message2); + assertEquals(4096 , message2.offset); + assertEquals(0 , message2.length); + assertEquals(4 * 1024, message2.capacity); + + //todo test what happens if the small buffer space is depleted of messages. + + } + + + @Test + public void testExpandMessage(){ + MessageBuffer messageBuffer = new MessageBuffer(); + + Message message = messageBuffer.getMessage(); + + byte[] smallSharedArray = message.sharedArray; + + assertNotNull(message); + assertEquals(0 , message.offset); + assertEquals(0 , message.length); + assertEquals(4 * 1024, message.capacity); + + messageBuffer.expandMessage(message); + assertEquals(0 , message.offset); + assertEquals(0 , message.length); + assertEquals(128 * 1024, message.capacity); + + byte[] mediumSharedArray = message.sharedArray; + assertNotSame(smallSharedArray, mediumSharedArray); + + messageBuffer.expandMessage(message); + assertEquals(0 , message.offset); + assertEquals(0 , message.length); + assertEquals(1024 * 1024, message.capacity); + + byte[] largeSharedArray = message.sharedArray; + assertNotSame(smallSharedArray, largeSharedArray); + assertNotSame(mediumSharedArray, largeSharedArray); + + //next expansion should not be possible. + assertFalse(messageBuffer.expandMessage(message)); + assertEquals(0 , message.offset); + assertEquals(0 , message.length); + assertEquals(1024 * 1024, message.capacity); + assertSame(message.sharedArray, largeSharedArray); + + + + } +} diff --git a/src/test/java/com.jenkov.nioserver/MessageTest.java b/src/test/java/com.jenkov.nioserver/MessageTest.java new file mode 100644 index 0000000..dcc070f --- /dev/null +++ b/src/test/java/com.jenkov.nioserver/MessageTest.java @@ -0,0 +1,59 @@ +package com.jenkov.nioserver; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; + +import java.nio.ByteBuffer; + + +/** + * Created by jjenkov on 18-10-2015. + */ +public class MessageTest { + + + @Test + public void testWriteToMessage() { + MessageBuffer messageBuffer = new MessageBuffer(); + + Message message = messageBuffer.getMessage(); + ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024); + + fill(byteBuffer, 4096); + + int written = message.writeToMessage(byteBuffer); + assertEquals(4096, written); + assertEquals(4096, message.length); + assertSame(messageBuffer.smallMessageBuffer, message.sharedArray); + + fill(byteBuffer, 124 * 1024); + written = message.writeToMessage(byteBuffer); + assertEquals(124 * 1024, written); + assertEquals(128 * 1024, message.length); + assertSame(messageBuffer.mediumMessageBuffer, message.sharedArray); + + fill(byteBuffer, (1024-128) * 1024); + written = message.writeToMessage(byteBuffer); + assertEquals(896 * 1024, written); + assertEquals(1024 * 1024, message.length); + assertSame(messageBuffer.largeMessageBuffer, message.sharedArray); + + fill(byteBuffer, 1); + written = message.writeToMessage(byteBuffer); + assertEquals(-1, written); + + } + + private void fill(ByteBuffer byteBuffer, int length){ + byteBuffer.clear(); + for(int i=0; i