First, not-so-clean version of the NIO server.

This commit is contained in:
Jakob Jenkov 2015-10-22 20:02:55 +02:00
parent ef91a57682
commit 4bb99c6c38
25 changed files with 1611 additions and 0 deletions

View File

@ -0,0 +1,10 @@
package com.jenkov.nioserver;
/**
* Created by jjenkov on 16-10-2015.
*/
public interface IMessageProcessor {
public void process(Message message, WriteProxy writeProxy);
}

View File

@ -0,0 +1,18 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Created by jjenkov on 16-10-2015.
*/
public interface IMessageReader {
public void read(Socket socket, ByteBuffer byteBuffer) throws IOException;
public List<Message> getMessages();
}

View File

@ -0,0 +1,9 @@
package com.jenkov.nioserver;
/**
* Created by jjenkov on 16-10-2015.
*/
public interface IMessageReaderFactory {
public IMessageReader createMessageReader();
}

View File

@ -0,0 +1,23 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
/**
* Created by jjenkov on 16-10-2015.
*/
public interface IMessageWriter {
/**
* Called by IMessageProcessor - so no access to write Selector from here.
*
* @param message
*/
public void enqueue(Message message);
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException;
public boolean isEmpty();
}

View File

@ -0,0 +1,12 @@
package com.jenkov.nioserver;
import java.nio.channels.Selector;
/**
* Created by jjenkov on 16-10-2015.
*/
public interface IMessageWriterFactory {
public IMessageWriter createMessageWriter();
}

View File

@ -0,0 +1,105 @@
package com.jenkov.nioserver;
import java.nio.ByteBuffer;
/**
* Created by jjenkov on 16-10-2015.
*/
public class Message {
private MessageBuffer messageBuffer = null;
public long socketId = 0; // the id of source socket or destination socket, depending on whether is going in or out.
public byte[] sharedArray = null;
public int offset = 0; //offset into sharedArray where this message data starts.
public int capacity = 0; //the size of the section in the sharedArray allocated to this message.
public int length = 0; //the number of bytes used of the allocated section.
public Object metaData = null;
public Message(MessageBuffer messageBuffer) {
this.messageBuffer = messageBuffer;
}
/**
* Writes data from the ByteBuffer into this message - meaning into the buffer backing this message.
*
* @param byteBuffer The ByteBuffer containing the message data to write.
* @return
*/
public int writeToMessage(ByteBuffer byteBuffer){
int remaining = byteBuffer.remaining();
while(this.length + remaining > capacity){
if(!this.messageBuffer.expandMessage(this)) {
return -1;
}
}
int bytesToCopy = Math.min(remaining, this.capacity - this.length);
byteBuffer.get(this.sharedArray, this.offset + this.length, bytesToCopy);
this.length += bytesToCopy;
return bytesToCopy;
}
/**
* Writes data from the byte array into this message - meaning into the buffer backing this message.
*
* @param byteArray The byte array containing the message data to write.
* @return
*/
public int writeToMessage(byte[] byteArray){
return writeToMessage(byteArray, 0, byteArray.length);
}
/**
* Writes data from the byte array into this message - meaning into the buffer backing this message.
*
* @param byteArray The byte array containing the message data to write.
* @return
*/
public int writeToMessage(byte[] byteArray, int offset, int length){
int remaining = length;
while(this.length + remaining > capacity){
if(!this.messageBuffer.expandMessage(this)) {
return -1;
}
}
int bytesToCopy = Math.min(remaining, this.capacity - this.length);
System.arraycopy(byteArray, offset, this.sharedArray, this.offset + this.length, bytesToCopy);
this.length += bytesToCopy;
return bytesToCopy;
}
/**
* In case the buffer backing the nextMessage contains more than one HTTP message, move all data after the first
* message to a new Message object.
*
* @param message The message containing the partial message (after the first message).
* @param endIndex The end index of the first message in the buffer of the message given as parameter.
*/
public void writePartialMessageToMessage(Message message, int endIndex){
int startIndexOfPartialMessage = message.offset + endIndex;
int lengthOfPartialMessage = (message.offset + message.length) - endIndex;
System.arraycopy(message.sharedArray, startIndexOfPartialMessage, this.sharedArray, this.offset, lengthOfPartialMessage);
}
public int writeToByteBuffer(ByteBuffer byteBuffer){
return 0;
}
}

View File

@ -0,0 +1,88 @@
package com.jenkov.nioserver;
/**
* A shared buffer which can contain many messages inside. A message gets a section of the buffer to use. If the
* message outgrows the section in size, the message requests a larger section and the message is copied to that
* larger section. The smaller section is then freed again.
*
*
* Created by jjenkov on 18-10-2015.
*/
public class MessageBuffer {
public static int KB = 1024;
public static int MB = 1024 * KB;
private static final int CAPACITY_SMALL = 4 * KB;
private static final int CAPACITY_MEDIUM = 128 * KB;
private static final int CAPACITY_LARGE = 1024 * KB;
//package scope (default) - so they can be accessed from unit tests.
byte[] smallMessageBuffer = new byte[1024 * 4 * KB]; //1024 x 4KB messages = 4MB.
byte[] mediumMessageBuffer = new byte[128 * 128 * KB]; // 128 x 128KB messages = 16MB.
byte[] largeMessageBuffer = new byte[16 * 1 * MB]; // 16 * 1MB messages = 16MB.
QueueIntFlip smallMessageBufferFreeBlocks = new QueueIntFlip(1024); // 1024 free sections
QueueIntFlip mediumMessageBufferFreeBlocks = new QueueIntFlip(128); // 128 free sections
QueueIntFlip largeMessageBufferFreeBlocks = new QueueIntFlip(16); // 16 free sections
//todo make all message buffer capacities and block sizes configurable
//todo calculate free block queue sizes based on capacity and block size of buffers.
public MessageBuffer() {
//add all free sections to all free section queues.
for(int i=0; i<smallMessageBuffer.length; i+= CAPACITY_SMALL){
this.smallMessageBufferFreeBlocks.put(i);
}
for(int i=0; i<mediumMessageBuffer.length; i+= CAPACITY_MEDIUM){
this.mediumMessageBufferFreeBlocks.put(i);
}
for(int i=0; i<largeMessageBuffer.length; i+= CAPACITY_LARGE){
this.largeMessageBufferFreeBlocks.put(i);
}
}
public Message getMessage() {
int nextFreeSmallBlock = this.smallMessageBufferFreeBlocks.take();
if(nextFreeSmallBlock == -1) return null;
Message message = new Message(this); //todo get from Message pool - caps memory usage.
message.sharedArray = this.smallMessageBuffer;
message.capacity = CAPACITY_SMALL;
message.offset = nextFreeSmallBlock;
message.length = 0;
return message;
}
public boolean expandMessage(Message message){
if(message.capacity == CAPACITY_SMALL){
return moveMessage(message, this.smallMessageBufferFreeBlocks, this.mediumMessageBufferFreeBlocks, this.mediumMessageBuffer, CAPACITY_MEDIUM);
} else if(message.capacity == CAPACITY_MEDIUM){
return moveMessage(message, this.mediumMessageBufferFreeBlocks, this.largeMessageBufferFreeBlocks, this.largeMessageBuffer, CAPACITY_LARGE);
} else {
return false;
}
}
private boolean moveMessage(Message message, QueueIntFlip srcBlockQueue, QueueIntFlip destBlockQueue, byte[] dest, int newCapacity) {
int nextFreeBlock = destBlockQueue.take();
if(nextFreeBlock == -1) return false;
System.arraycopy(message.sharedArray, message.offset, dest, nextFreeBlock, message.length);
srcBlockQueue.put(message.offset); //free smaller block after copy
message.sharedArray = dest;
message.offset = nextFreeBlock;
message.capacity = newCapacity;
return true;
}
}

View File

@ -0,0 +1,59 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Created by jjenkov on 21-10-2015.
*/
public class MessageWriter implements IMessageWriter {
private List<Message> writeQueue = new ArrayList<>();
private Message messageInProgress = null;
private int bytesWritten = 0;
public MessageWriter() {
}
@Override
public void enqueue(Message message) {
if(this.messageInProgress == null){
this.messageInProgress = message;
System.out.println("Message set as message in progress.");
} else {
this.writeQueue.add(message);
System.out.println("Message enqueued.");
}
//todo register socket for write interest
}
@Override
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
System.out.println("Writing message to socket");
byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten);
byteBuffer.flip();
this.bytesWritten += socket.write(byteBuffer);
byteBuffer.clear();
if(bytesWritten >= this.messageInProgress.length){
if(this.writeQueue.size() > 0){
this.messageInProgress = this.writeQueue.remove(0);
} else {
this.messageInProgress = null;
//todo unregister from selector
}
}
}
@Override
public boolean isEmpty() {
return this.writeQueue.isEmpty() && this.messageInProgress == null;
}
}

View File

@ -0,0 +1,186 @@
package com.jenkov.nioserver;
/**
* Same as QueueFillCount, except that QueueFlip uses a flip flag to keep track of when the internal writePos has
* "overflowed" (meaning it goes back to 0). Other than that, the two implementations are very similar in functionality.
*
* One additional difference is that QueueFlip has an available() method, where this is a public variable in
* QueueFillCount.
*
* Created by jjenkov on 18-09-2015.
*/
public class QueueIntFlip {
public int[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipped = false;
public QueueIntFlip(int capacity) {
this.capacity = capacity;
this.elements = new int[capacity]; //todo get from TypeAllocator ?
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipped = false;
}
public int available() {
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() {
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(int element){
if(!flipped){
if(writePos == capacity){
writePos = 0;
flipped = true;
if(writePos < readPos){
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if(writePos < readPos ){
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public int put(int[] newElements, int length){
int newElementsReadPos = 0;
if(!flipped){
//readPos lower than writePos - free sections are:
//1) from writePos to capacity
//2) from 0 to readPos
if(length <= capacity - writePos){
//new elements fit into top of elements array - copy directly
for(; newElementsReadPos < length; newElementsReadPos++){
this.elements[this.writePos++] = newElements[newElementsReadPos];
}
return newElementsReadPos;
} else {
//new elements must be divided between top and bottom of elements array
//writing to top
for(;this.writePos < capacity; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
//writing to bottom
this.writePos = 0;
this.flipped = true;
int endPos = Math.min(this.readPos, length - newElementsReadPos);
for(; this.writePos < endPos; this.writePos++){
this.elements[writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
} else {
//readPos higher than writePos - free sections are:
//1) from writePos to readPos
int endPos = Math.min(this.readPos, this.writePos + length);
for(; this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
}
public int take() {
if(!flipped){
if(readPos < writePos){
return elements[readPos++];
} else {
return -1;
}
} else {
if(readPos == capacity){
readPos = 0;
flipped = false;
if(readPos < writePos){
return elements[readPos++];
} else {
return -1;
}
} else {
return elements[readPos++];
}
}
}
public int take(int[] into, int length){
int intoWritePos = 0;
if(!flipped){
//writePos higher than readPos - available section is writePos - readPos
int endPos = Math.min(this.writePos, this.readPos + length);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
} else {
//readPos higher than writePos - available sections are top + bottom of elements array
if(length <= capacity - readPos){
//length is lower than the elements available at the top of the elements array - copy directly
for(; intoWritePos < length; intoWritePos++){
into[intoWritePos] = this.elements[this.readPos++];
}
return intoWritePos;
} else {
//length is higher than elements available at the top of the elements array
//split copy into a copy from both top and bottom of elements array.
//copy from top
for(; this.readPos < capacity; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
//copy from bottom
this.readPos = 0;
this.flipped = false;
int endPos = Math.min(this.writePos, length - intoWritePos);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
}
}
}
}

View File

@ -0,0 +1,54 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Queue;
/**
* Created by jjenkov on 19-10-2015.
*/
public class Server implements Runnable{
private int tcpPort = 0;
private ServerSocketChannel serverSocket = null;
private Queue socketQueue = null;
public Server(int tcpPort, Queue socketQueue) {
this.tcpPort = tcpPort;
this.socketQueue = socketQueue;
}
public void run() {
try{
this.serverSocket = ServerSocketChannel.open();
this.serverSocket.bind(new InetSocketAddress(tcpPort));
} catch(IOException e){
e.printStackTrace();
return;
}
while(true){
try{
SocketChannel socketChannel = this.serverSocket.accept();
System.out.println("Socket accepted");
//todo check if the queue can even accept more sockets.
this.socketQueue.add(new Socket(socketChannel));
} catch(IOException e){
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,213 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.*;
/**
* Created by jjenkov on 16-10-2015.
*/
public class ServerCore implements Runnable {
private MessageBuffer readMessageBuffer = null;
private MessageBuffer writeMessageBuffer = null;
private Queue<Socket> inboundSocketQueue = null;
private Queue<Message> outboundMessageQueue = new LinkedList<>(); //todo use a better / faster queue.
private Map<Long, Socket> socketMap = new HashMap<>();
private IMessageReaderFactory messageReaderFactory = null;
private IMessageWriterFactory messageWriterFactory = null;
private ByteBuffer readByteBuffer = ByteBuffer.allocate(1024 * 1024);
private ByteBuffer writeByteBuffer = ByteBuffer.allocate(1024 * 1024);
private Selector readSelector = null;
private Selector writeSelector = null;
private IMessageProcessor messageProcessor = null;
private WriteProxy writeProxy = null;
private long nextSocketId = 16 * 1024; //start incoming socket ids from 16K - reserve bottom ids for pre-defined sockets (servers).
private Set<Socket> emptyToNonEmptySockets = new HashSet<>();
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
public ServerCore(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageWriterFactory messageWriterFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue;
this.readMessageBuffer = readMessageBuffer;
this.writeMessageBuffer = writeMessageBuffer;
this.writeProxy = new WriteProxy(writeMessageBuffer, this.outboundMessageQueue);
this.messageReaderFactory = messageReaderFactory;
this.messageWriterFactory = messageWriterFactory;
this.messageProcessor = messageProcessor;
this.readSelector = Selector.open();
this.writeSelector = Selector.open();
}
public void run() {
while(true){
try{
executeCycle();
} catch(IOException e){
e.printStackTrace();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void executeCycle() throws IOException {
takeNewSockets();
readFromSockets();
writeToSockets();
}
public void takeNewSockets() throws IOException {
Socket newSocket = this.inboundSocketQueue.poll();
while(newSocket != null){
newSocket.socketId = this.nextSocketId++;
newSocket.socketChannel.configureBlocking(false);
newSocket.messageReader = this.messageReaderFactory.createMessageReader();
newSocket.messageWriter = this.messageWriterFactory.createMessageWriter();
this.socketMap.put(newSocket.socketId, newSocket);
SelectionKey key = newSocket.socketChannel.register(this.readSelector, SelectionKey.OP_READ);
key.attach(newSocket);
newSocket = this.inboundSocketQueue.poll();
}
}
public void readFromSockets() throws IOException {
int readReady = this.readSelector.selectNow();
if(readReady > 0){
Set<SelectionKey> selectedKeys = this.readSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
readFromSocket(key);
keyIterator.remove();
}
selectedKeys.clear();
}
}
private void readFromSocket(SelectionKey key) throws IOException {
Socket socket = (Socket) key.attachment();
socket.messageReader.read(socket, this.readByteBuffer);
List<Message> fullMessages = socket.messageReader.getMessages();
if(fullMessages.size() > 0){
for(Message message : fullMessages){
message.socketId = socket.socketId;
this.messageProcessor.process(message, this.writeProxy); //the message processor will eventually push outgoing messages into an IMessageWriter for this socket.
}
fullMessages.clear();
}
if(socket.endOfStreamReached){
this.socketMap.remove(socket.socketId);
key.attach(null);
key.cancel();
key.channel().close();
}
}
public void writeToSockets() throws IOException {
// Take all new messages from outboundMessageQueue
takeNewOutboundMessages();
// Cancel all sockets which have no more data to write.
cancelEmptySockets();
// Register all sockets that *have* data and which are not yet registered.
registerNonEmptySockets();
// Select from the Selector.
int writeReady = this.writeSelector.selectNow();
if(writeReady > 0){
Set<SelectionKey> selectionKeys = this.writeSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
Socket socket = (Socket) key.attachment();
socket.messageWriter.write(socket, this.writeByteBuffer);
if(socket.messageWriter.isEmpty()){
this.nonEmptyToEmptySockets.add(socket);
}
keyIterator.remove();
}
selectionKeys.clear();
}
}
private void registerNonEmptySockets() throws ClosedChannelException {
for(Socket socket : emptyToNonEmptySockets){
socket.socketChannel.register(this.writeSelector, SelectionKey.OP_WRITE, socket);
}
emptyToNonEmptySockets.clear();
}
private void cancelEmptySockets() {
for(Socket socket : nonEmptyToEmptySockets){
SelectionKey key = socket.socketChannel.keyFor(this.writeSelector);
key.cancel();
}
nonEmptyToEmptySockets.clear();
}
private void takeNewOutboundMessages() {
Message outMessage = this.outboundMessageQueue.poll();
while(outMessage != null){
Socket socket = this.socketMap.get(outMessage.socketId);
if(socket != null){
IMessageWriter messageWriter = socket.messageWriter;
if(messageWriter.isEmpty()){
messageWriter.enqueue(outMessage);
nonEmptyToEmptySockets.remove(socket);
emptyToNonEmptySockets.add(socket); //not necessary if removed from nonEmptyToEmptySockets in prev. statement.
} else{
messageWriter.enqueue(outMessage);
}
}
outMessage = this.outboundMessageQueue.poll();
}
}
}

View File

@ -0,0 +1,57 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* Created by jjenkov on 16-10-2015.
*/
public class Socket {
public long socketId;
public SocketChannel socketChannel = null;
public IMessageReader messageReader = null;
public IMessageWriter messageWriter = null;
public boolean endOfStreamReached = false;
public Socket() {
}
public Socket(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public int read(ByteBuffer byteBuffer) throws IOException {
int bytesRead = this.socketChannel.read(byteBuffer);
int totalBytesRead = bytesRead;
while(bytesRead > 0){
bytesRead = this.socketChannel.read(byteBuffer);
totalBytesRead += bytesRead;
}
if(bytesRead == -1){
this.endOfStreamReached = true;
}
return totalBytesRead;
}
public int write(ByteBuffer byteBuffer) throws IOException{
int bytesWritten = this.socketChannel.write(byteBuffer);
int totalBytesWritten = bytesWritten;
while(bytesWritten > 0 && byteBuffer.hasRemaining()){
bytesWritten = this.socketChannel.write(byteBuffer);
totalBytesWritten += bytesWritten;
}
return totalBytesWritten;
}
}

View File

@ -0,0 +1,87 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
*
* Todo - rename class to SocketWriter ?
*
* Created by jjenkov on 20-10-2015.
*/
public class SocketSwitch {
private Map<Long, Socket> socketMap = new HashMap<>();
private Map<Long, IMessageWriter> messageWriterMap = new HashMap<>();
private Set<Socket> socketsToCancel = new HashSet<>();
private Set<Socket> socketsToRegister = new HashSet<>();
public Set<Socket> getSocketsToCancel() {
return this.socketsToCancel;
}
public Set<Socket> getSocketsToRegister() {
return this.socketsToRegister;
}
//todo change putMessageWriter to registerSocket();
public void registerSocket(Socket socket){
System.out.println("Registering socket: " + socket.socketId );
this.socketMap.put(socket.socketId, socket);
this.messageWriterMap.put(socket.socketId, socket.messageWriter);
}
public void enqueueMessage(long socketId, Message message){
IMessageWriter messageWriter = this.messageWriterMap.get(socketId);
if(messageWriter != null){
boolean wasEmpty = messageWriter.isEmpty();
messageWriter.enqueue(message);
if(wasEmpty){
Socket socket = this.socketMap.get(socketId);
if(this.socketsToCancel.contains(socket)){
//Socket already registered with Selector. Remove cancellation request but
// do not re-register socket with Selector
this.socketsToCancel.remove(socket);
} else {
//Socket not currently registered with Selector. Request registration.
this.socketsToRegister.add(socket);
}
}
}
}
public void writeToSocket(Socket socket, ByteBuffer writeByteBuffer) throws IOException {
IMessageWriter messageWriter = messageWriterMap.get(socket.socketId);
if(messageWriter != null){
messageWriter.write(socket, writeByteBuffer);
if(messageWriter.isEmpty()) {
this.socketsToCancel.add(socket);
}
}
}
public void unregisterSocket(long socketId){
System.out.println("Unregistering socket: " + socketId);
this.socketMap.remove(socketId);
//todo in case the message writer should survive a "missing connection" - don't remove it.
this.messageWriterMap.remove(socketId);
}
public void removeMessageWriter(long id){
this.socketMap.remove(id);
}
}

View File

@ -0,0 +1,26 @@
package com.jenkov.nioserver;
import java.util.Queue;
/**
* Created by jjenkov on 22-10-2015.
*/
public class WriteProxy {
private MessageBuffer messageBuffer = null;
private Queue writeQueue = null;
public WriteProxy(MessageBuffer messageBuffer, Queue writeQueue) {
this.messageBuffer = messageBuffer;
this.writeQueue = writeQueue;
}
public Message getMessage(){
return this.messageBuffer.getMessage();
}
public boolean enqueue(Message message){
return this.writeQueue.offer(message);
}
}

View File

@ -0,0 +1,55 @@
package com.jenkov.nioserver.example;
import com.jenkov.nioserver.*;
import com.jenkov.nioserver.http.HttpMessageReaderFactory;
import com.jenkov.nioserver.http.HttpMessageWriterFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by jjenkov on 19-10-2015.
*/
public class Main {
public static void main(String[] args) throws IOException {
BlockingQueue inboundSocketQueue = new ArrayBlockingQueue(1024);
Server server = new Server(9999, inboundSocketQueue);
Thread serverThread = new Thread(server);
serverThread.start();
MessageBuffer readMessageBuffer = new MessageBuffer();
MessageBuffer writeMessageBuffer = new MessageBuffer();
IMessageReaderFactory messageReaderFactory = new HttpMessageReaderFactory(readMessageBuffer);
IMessageWriterFactory messageWriterFactory = new HttpMessageWriterFactory();
String httpResponse = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 38\r\n" +
"Content-Type: text/html\r\n" +
"\r\n" +
"<html><body>Hello World!</body></html>";
byte[] httpResponseBytes = httpResponse.getBytes("UTF-8");
IMessageProcessor messageProcessor = (request, writeProxy) -> {
System.out.println("Message Received from socket: " + request.socketId);
Message response = writeProxy.getMessage();
response.socketId = request.socketId;
response.writeToMessage(httpResponseBytes);
writeProxy.enqueue(response);
};
ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageWriterFactory, messageProcessor);
Thread serverCoreThread = new Thread(serverCore);
serverCoreThread.start();
}
}

View File

@ -0,0 +1,27 @@
package com.jenkov.nioserver.http;
/**
* Created by jjenkov on 19-10-2015.
*/
public class HttpHeaders {
public static int HTTP_METHOD_GET = 1;
public static int HTTP_METHOD_POST = 2;
public static int HTTP_METHOD_PUT = 3;
public static int HTTP_METHOD_HEAD = 4;
public static int HTTP_METHOD_DELETE = 5;
public int httpMethod = 0;
public int hostStartIndex = 0;
public int hostEndIndex = 0;
public int contentLength = 0;
public int bodyStartIndex = 0;
public int bodyEndIndex = 0;
}

View File

@ -0,0 +1,60 @@
package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageReader;
import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.MessageBuffer;
import com.jenkov.nioserver.Socket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Created by jjenkov on 18-10-2015.
*/
public class HttpMessageReader implements IMessageReader {
private MessageBuffer messageBuffer = null;
private List<Message> completeMessages = new ArrayList<Message>();
private Message nextMessage = null;
public HttpMessageReader(MessageBuffer messageBuffer) {
this.messageBuffer = messageBuffer;
this.nextMessage = messageBuffer.getMessage();
this.nextMessage.metaData = new HttpHeaders();
}
@Override
public void read(Socket socket, ByteBuffer byteBuffer) throws IOException {
int bytesRead = socket.read(byteBuffer);
byteBuffer.flip();
if(byteBuffer.remaining() == 0){
byteBuffer.clear();
return;
}
this.nextMessage.writeToMessage(byteBuffer);
int endIndex = HttpUtil.parseHttpRequest(this.nextMessage.sharedArray, this.nextMessage.offset, this.nextMessage.offset + this.nextMessage.length, (HttpHeaders) this.nextMessage.metaData);
if(endIndex != -1){
Message message = this.messageBuffer.getMessage();
message.metaData = new HttpHeaders();
message.writePartialMessageToMessage(nextMessage, endIndex);
completeMessages.add(nextMessage);
nextMessage = message;
}
byteBuffer.clear();
}
@Override
public List<Message> getMessages() {
return this.completeMessages;
}
}

View File

@ -0,0 +1,22 @@
package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageReader;
import com.jenkov.nioserver.IMessageReaderFactory;
import com.jenkov.nioserver.MessageBuffer;
/**
* Created by jjenkov on 18-10-2015.
*/
public class HttpMessageReaderFactory implements IMessageReaderFactory {
private MessageBuffer messageBuffer = null;
public HttpMessageReaderFactory(MessageBuffer messageBuffer) {
this.messageBuffer = messageBuffer;
}
@Override
public IMessageReader createMessageReader() {
return new HttpMessageReader(this.messageBuffer);
}
}

View File

@ -0,0 +1,64 @@
package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageWriter;
import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.SocketSwitch;
import com.jenkov.nioserver.Socket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Created by jjenkov on 21-10-2015.
*/
public class HttpMessageWriter implements IMessageWriter {
private List<Message> writeQueue = new ArrayList<>();
private Message messageInProgress = null;
private int bytesWritten = 0;
public HttpMessageWriter() {
}
@Override
public void enqueue(Message message) {
if(this.messageInProgress == null){
this.messageInProgress = message;
System.out.println("Message set as message in progress.");
} else {
this.writeQueue.add(message);
System.out.println("Message enqueued.");
}
//todo register socket for write interest
}
@Override
public void write(Socket socket, ByteBuffer byteBuffer) throws IOException {
System.out.println("Writing message to socket");
byteBuffer.put(this.messageInProgress.sharedArray, this.messageInProgress.offset + this.bytesWritten, this.messageInProgress.length - this.bytesWritten);
byteBuffer.flip();
this.bytesWritten += socket.write(byteBuffer);
byteBuffer.clear();
if(bytesWritten >= this.messageInProgress.length){
if(this.writeQueue.size() > 0){
this.messageInProgress = this.writeQueue.remove(0);
} else {
this.messageInProgress = null;
//todo unregister from selector
}
}
}
@Override
public boolean isEmpty() {
return this.writeQueue.isEmpty() && this.messageInProgress == null;
}
}

View File

@ -0,0 +1,18 @@
package com.jenkov.nioserver.http;
import com.jenkov.nioserver.IMessageWriter;
import com.jenkov.nioserver.IMessageWriterFactory;
import java.nio.channels.Selector;
/**
* Created by jjenkov on 19-10-2015.
*/
public class HttpMessageWriterFactory implements IMessageWriterFactory {
@Override
public IMessageWriter createMessageWriter() {
return new HttpMessageWriter();
//return null;
}
}

View File

@ -0,0 +1,155 @@
package com.jenkov.nioserver.http;
import java.io.UnsupportedEncodingException;
/**
* Created by jjenkov on 19-10-2015.
*/
public class HttpUtil {
private static final byte[] GET = new byte[]{'G','E','T'};
private static final byte[] POST = new byte[]{'P','O','S','T'};
private static final byte[] PUT = new byte[]{'P','U','T'};
private static final byte[] HEAD = new byte[]{'H','E','A','D'};
private static final byte[] DELETE = new byte[]{'D','E','L','E','T','E'};
private static final byte[] HOST = new byte[]{'H','o','s','t'};
private static final byte[] CONTENT_LENGTH = new byte[]{'C','o','n','t','e','n','t','-','L','e','n','g','t','h'};
public static int parseHttpRequest(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders){
/*
int endOfHttpMethod = findNext(src, startIndex, endIndex, (byte) ' ');
if(endOfHttpMethod == -1) return false;
resolveHttpMethod(src, startIndex, httpHeaders);
*/
//parse HTTP request line
int endOfFirstLine = findNextLineBreak(src, startIndex, endIndex);
if(endOfFirstLine == -1) return -1;
//parse HTTP headers
int prevEndOfHeader = endOfFirstLine + 1;
int endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex);
while(endOfHeader != -1 && endOfHeader != prevEndOfHeader + 1){ //prevEndOfHeader + 1 = end of previous header + 2 (+2 = CR + LF)
if(matches(src, prevEndOfHeader, CONTENT_LENGTH)){
try {
findContentLength(src, prevEndOfHeader, endIndex, httpHeaders);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
prevEndOfHeader = endOfHeader + 1;
endOfHeader = findNextLineBreak(src, prevEndOfHeader, endIndex);
}
if(endOfHeader == -1){
return -1;
}
//check that byte array contains full HTTP message.
int bodyStartIndex = endOfHeader + 1;
int bodyEndIndex = bodyStartIndex + httpHeaders.contentLength;
if(bodyEndIndex <= endIndex){
//byte array contains a full HTTP request
httpHeaders.bodyStartIndex = bodyStartIndex;
httpHeaders.bodyEndIndex = bodyEndIndex;
return bodyEndIndex;
}
return -1;
}
private static void findContentLength(byte[] src, int startIndex, int endIndex, HttpHeaders httpHeaders) throws UnsupportedEncodingException {
int indexOfColon = findNext(src, startIndex, endIndex, (byte) ':');
//skip spaces after colon
int index = indexOfColon +1;
while(src[index] == ' '){
index++;
}
int valueStartIndex = index;
int valueEndIndex = index;
boolean endOfValueFound = false;
while(index < endIndex && !endOfValueFound){
switch(src[index]){
case '0' : ;
case '1' : ;
case '2' : ;
case '3' : ;
case '4' : ;
case '5' : ;
case '6' : ;
case '7' : ;
case '8' : ;
case '9' : { index++; break; }
default: {
endOfValueFound = true;
valueEndIndex = index;
}
}
}
httpHeaders.contentLength = Integer.parseInt(new String(src, valueStartIndex, valueEndIndex - valueStartIndex, "UTF-8"));
}
public static int findNext(byte[] src, int startIndex, int endIndex, byte value){
for(int index = startIndex; index < endIndex; index++){
if(src[index] == value) return index;
}
return -1;
}
public static int findNextLineBreak(byte[] src, int startIndex, int endIndex) {
for(int index = startIndex; index < endIndex; index++){
if(src[index] == '\n'){
if(src[index - 1] == '\r'){
return index;
}
};
}
return -1;
}
public static void resolveHttpMethod(byte[] src, int startIndex, HttpHeaders httpHeaders){
if(matches(src, startIndex, GET)) {
httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_GET;
return;
}
if(matches(src, startIndex, POST)){
httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_POST;
return;
}
if(matches(src, startIndex, PUT)){
httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_PUT;
return;
}
if(matches(src, startIndex, HEAD)){
httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_HEAD;
return;
}
if(matches(src, startIndex, DELETE)){
httpHeaders.httpMethod = HttpHeaders.HTTP_METHOD_DELETE;
return;
}
}
public static boolean matches(byte[] src, int offset, byte[] value){
for(int i=offset, n=0; n < value.length; i++, n++){
if(src[i] != value[n]) return false;
}
return true;
}
}

View File

@ -0,0 +1,80 @@
package com.jenkov.nioserver;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
/**
* Created by jjenkov on 18-10-2015.
*/
public class MessageBufferTest {
@Test
public void testGetMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
assertNotNull(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(4 * 1024, message.capacity);
Message message2 = messageBuffer.getMessage();
assertNotNull(message2);
assertEquals(4096 , message2.offset);
assertEquals(0 , message2.length);
assertEquals(4 * 1024, message2.capacity);
//todo test what happens if the small buffer space is depleted of messages.
}
@Test
public void testExpandMessage(){
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
byte[] smallSharedArray = message.sharedArray;
assertNotNull(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(4 * 1024, message.capacity);
messageBuffer.expandMessage(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(128 * 1024, message.capacity);
byte[] mediumSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, mediumSharedArray);
messageBuffer.expandMessage(message);
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(1024 * 1024, message.capacity);
byte[] largeSharedArray = message.sharedArray;
assertNotSame(smallSharedArray, largeSharedArray);
assertNotSame(mediumSharedArray, largeSharedArray);
//next expansion should not be possible.
assertFalse(messageBuffer.expandMessage(message));
assertEquals(0 , message.offset);
assertEquals(0 , message.length);
assertEquals(1024 * 1024, message.capacity);
assertSame(message.sharedArray, largeSharedArray);
}
}

View File

@ -0,0 +1,59 @@
package com.jenkov.nioserver;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import java.nio.ByteBuffer;
/**
* Created by jjenkov on 18-10-2015.
*/
public class MessageTest {
@Test
public void testWriteToMessage() {
MessageBuffer messageBuffer = new MessageBuffer();
Message message = messageBuffer.getMessage();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
fill(byteBuffer, 4096);
int written = message.writeToMessage(byteBuffer);
assertEquals(4096, written);
assertEquals(4096, message.length);
assertSame(messageBuffer.smallMessageBuffer, message.sharedArray);
fill(byteBuffer, 124 * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(124 * 1024, written);
assertEquals(128 * 1024, message.length);
assertSame(messageBuffer.mediumMessageBuffer, message.sharedArray);
fill(byteBuffer, (1024-128) * 1024);
written = message.writeToMessage(byteBuffer);
assertEquals(896 * 1024, written);
assertEquals(1024 * 1024, message.length);
assertSame(messageBuffer.largeMessageBuffer, message.sharedArray);
fill(byteBuffer, 1);
written = message.writeToMessage(byteBuffer);
assertEquals(-1, written);
}
private void fill(ByteBuffer byteBuffer, int length){
byteBuffer.clear();
for(int i=0; i<length; i++){
byteBuffer.put((byte) (i%128));
}
byteBuffer.flip();
}
}

View File

@ -0,0 +1,36 @@
package com.jenkov.nioserver;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* Created by jjenkov on 21-10-2015.
*/
public class SelectorTest {
@Test
public void test() throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.bind(new InetSocketAddress("localhost", 9999));
socketChannel.configureBlocking(false);
SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_WRITE);
key1.cancel();
SelectionKey key2 = socketChannel.register(selector, SelectionKey.OP_WRITE);
key2.cancel();
}
}

View File

@ -0,0 +1,88 @@
package com.jenkov.nioserver.http;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
/**
* Created by jjenkov on 19-10-2015.
*/
public class HttpUtilTest {
@Test
public void testResolveHttpMethod() throws UnsupportedEncodingException {
assertHttpMethod("GET / HTTP/1.1\r\n" , HttpHeaders.HTTP_METHOD_GET);
assertHttpMethod("POST / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_POST);
assertHttpMethod("PUT / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_PUT);
assertHttpMethod("HEAD / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_HEAD);
assertHttpMethod("DELETE / HTTP/1.1\r\n", HttpHeaders.HTTP_METHOD_DELETE);
}
private void assertHttpMethod(String httpRequest, int httpMethod) throws UnsupportedEncodingException {
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.resolveHttpMethod(source, 0, httpHeaders);
assertEquals(httpMethod, httpHeaders.httpMethod);
}
@Test
public void testParseHttpRequest() throws UnsupportedEncodingException {
String httpRequest =
"GET / HTTP/1.1\r\n\r\n";
byte[] source = httpRequest.getBytes("UTF-8");
HttpHeaders httpHeaders = new HttpHeaders();
HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders);
assertEquals(0, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n1234";
source = httpRequest.getBytes("UTF-8");
assertEquals(-1, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
httpRequest =
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345" +
"GET / HTTP/1.1\r\n" +
"Content-Length: 5\r\n" +
"\r\n12345";
source = httpRequest.getBytes("UTF-8");
assertEquals(42, HttpUtil.parseHttpRequest(source, 0, source.length, httpHeaders));
assertEquals(5, httpHeaders.contentLength);
assertEquals(37, httpHeaders.bodyStartIndex);
assertEquals(42, httpHeaders.bodyEndIndex);
}
}