Server updated to have a single Server class which instantiates all the server components and can start the 2 server threads.

This commit is contained in:
Jakob Jenkov 2015-10-24 14:48:03 +02:00
parent 976ecb6af2
commit f9dfd56076
9 changed files with 103 additions and 63 deletions

View File

@ -9,6 +9,8 @@ import java.util.List;
*/ */
public interface IMessageReader { public interface IMessageReader {
public void init(MessageBuffer readMessageBuffer);
public void read(Socket socket, ByteBuffer byteBuffer) throws IOException; public void read(Socket socket, ByteBuffer byteBuffer) throws IOException;
public List<Message> getMessages(); public List<Message> getMessages();

View File

@ -6,4 +6,5 @@ package com.jenkov.nioserver;
public interface IMessageReaderFactory { public interface IMessageReaderFactory {
public IMessageReader createMessageReader(); public IMessageReader createMessageReader();
} }

View File

@ -1,54 +1,46 @@
package com.jenkov.nioserver; package com.jenkov.nioserver;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** /**
* Created by jjenkov on 19-10-2015. * Created by jjenkov on 24-10-2015.
*/ */
public class Server implements Runnable{ public class Server {
private SocketAccepter socketAccepter = null;
private SocketProcessor socketProcessor = null;
private int tcpPort = 0; private int tcpPort = 0;
private ServerSocketChannel serverSocket = null; private IMessageReaderFactory messageReaderFactory = null;
private IMessageProcessor messageProcessor = null;
private Queue socketQueue = null; public Server(int tcpPort, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) {
this.tcpPort = tcpPort;
this.messageReaderFactory = messageReaderFactory;
this.messageProcessor = messageProcessor;
}
public Server(int tcpPort, Queue socketQueue) { public void start() throws IOException {
this.tcpPort = tcpPort;
this.socketQueue = socketQueue; Queue socketQueue = new ArrayBlockingQueue(1024); //move 1024 to ServerConfig
this.socketAccepter = new SocketAccepter(tcpPort, socketQueue);
MessageBuffer readBuffer = new MessageBuffer();
MessageBuffer writeBuffer = new MessageBuffer();
this.socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor);
Thread accepterThread = new Thread(this.socketAccepter);
Thread processorThread = new Thread(this.socketProcessor);
accepterThread.start();
processorThread.start();
} }
public void run() {
try{
this.serverSocket = ServerSocketChannel.open();
this.serverSocket.bind(new InetSocketAddress(tcpPort));
} catch(IOException e){
e.printStackTrace();
return;
}
while(true){
try{
SocketChannel socketChannel = this.serverSocket.accept();
System.out.println("Socket accepted");
//todo check if the queue can even accept more sockets.
this.socketQueue.add(new Socket(socketChannel));
} catch(IOException e){
e.printStackTrace();
}
}
}
} }

View File

@ -17,7 +17,6 @@ public class Socket {
public boolean endOfStreamReached = false; public boolean endOfStreamReached = false;
public Socket() { public Socket() {
} }
@ -40,7 +39,6 @@ public class Socket {
return totalBytesRead; return totalBytesRead;
} }
public int write(ByteBuffer byteBuffer) throws IOException{ public int write(ByteBuffer byteBuffer) throws IOException{
int bytesWritten = this.socketChannel.write(byteBuffer); int bytesWritten = this.socketChannel.write(byteBuffer);
int totalBytesWritten = bytesWritten; int totalBytesWritten = bytesWritten;

View File

@ -0,0 +1,53 @@
package com.jenkov.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Queue;
/**
* Created by jjenkov on 19-10-2015.
*/
public class SocketAccepter implements Runnable{
private int tcpPort = 0;
private ServerSocketChannel serverSocket = null;
private Queue socketQueue = null;
public SocketAccepter(int tcpPort, Queue socketQueue) {
this.tcpPort = tcpPort;
this.socketQueue = socketQueue;
}
public void run() {
try{
this.serverSocket = ServerSocketChannel.open();
this.serverSocket.bind(new InetSocketAddress(tcpPort));
} catch(IOException e){
e.printStackTrace();
return;
}
while(true){
try{
SocketChannel socketChannel = this.serverSocket.accept();
System.out.println("Socket accepted: " + socketChannel);
//todo check if the queue can even accept more sockets.
this.socketQueue.add(new Socket(socketChannel));
} catch(IOException e){
e.printStackTrace();
}
}
}
}

View File

@ -10,7 +10,7 @@ import java.util.*;
/** /**
* Created by jjenkov on 16-10-2015. * Created by jjenkov on 16-10-2015.
*/ */
public class ServerCore implements Runnable { public class SocketProcessor implements Runnable {
private Queue<Socket> inboundSocketQueue = null; private Queue<Socket> inboundSocketQueue = null;
@ -37,7 +37,7 @@ public class ServerCore implements Runnable {
private Set<Socket> nonEmptyToEmptySockets = new HashSet<>(); private Set<Socket> nonEmptyToEmptySockets = new HashSet<>();
public ServerCore(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { public SocketProcessor(Queue<Socket> inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException {
this.inboundSocketQueue = inboundSocketQueue; this.inboundSocketQueue = inboundSocketQueue;
this.readMessageBuffer = readMessageBuffer; this.readMessageBuffer = readMessageBuffer;
@ -84,6 +84,8 @@ public class ServerCore implements Runnable {
newSocket.socketChannel.configureBlocking(false); newSocket.socketChannel.configureBlocking(false);
newSocket.messageReader = this.messageReaderFactory.createMessageReader(); newSocket.messageReader = this.messageReaderFactory.createMessageReader();
newSocket.messageReader.init(this.readMessageBuffer);
newSocket.messageWriter = new MessageWriter(); newSocket.messageWriter = new MessageWriter();
this.socketMap.put(newSocket.socketId, newSocket); this.socketMap.put(newSocket.socketId, newSocket);

View File

@ -13,16 +13,6 @@ import java.util.concurrent.BlockingQueue;
public class Main { public class Main {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
BlockingQueue inboundSocketQueue = new ArrayBlockingQueue(1024);
Server server = new Server(9999, inboundSocketQueue);
Thread serverThread = new Thread(server);
serverThread.start();
MessageBuffer readMessageBuffer = new MessageBuffer();
MessageBuffer writeMessageBuffer = new MessageBuffer();
IMessageReaderFactory messageReaderFactory = new HttpMessageReaderFactory(readMessageBuffer);
String httpResponse = "HTTP/1.1 200 OK\r\n" + String httpResponse = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 38\r\n" + "Content-Length: 38\r\n" +
@ -42,9 +32,10 @@ public class Main {
writeProxy.enqueue(response); writeProxy.enqueue(response);
}; };
ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageProcessor); Server server = new Server(9999, new HttpMessageReaderFactory(), messageProcessor);
Thread serverCoreThread = new Thread(serverCore);
serverCoreThread.start(); server.start();
} }

View File

@ -20,8 +20,12 @@ public class HttpMessageReader implements IMessageReader {
private List<Message> completeMessages = new ArrayList<Message>(); private List<Message> completeMessages = new ArrayList<Message>();
private Message nextMessage = null; private Message nextMessage = null;
public HttpMessageReader(MessageBuffer messageBuffer) { public HttpMessageReader() {
this.messageBuffer = messageBuffer; }
@Override
public void init(MessageBuffer readMessageBuffer) {
this.messageBuffer = readMessageBuffer;
this.nextMessage = messageBuffer.getMessage(); this.nextMessage = messageBuffer.getMessage();
this.nextMessage.metaData = new HttpHeaders(); this.nextMessage.metaData = new HttpHeaders();
} }

View File

@ -9,14 +9,11 @@ import com.jenkov.nioserver.MessageBuffer;
*/ */
public class HttpMessageReaderFactory implements IMessageReaderFactory { public class HttpMessageReaderFactory implements IMessageReaderFactory {
private MessageBuffer messageBuffer = null; public HttpMessageReaderFactory() {
public HttpMessageReaderFactory(MessageBuffer messageBuffer) {
this.messageBuffer = messageBuffer;
} }
@Override @Override
public IMessageReader createMessageReader() { public IMessageReader createMessageReader() {
return new HttpMessageReader(this.messageBuffer); return new HttpMessageReader();
} }
} }