diff --git a/src/main/java/com/jenkov/nioserver/IMessageReader.java b/src/main/java/com/jenkov/nioserver/IMessageReader.java index 322b20c..19a6d1b 100644 --- a/src/main/java/com/jenkov/nioserver/IMessageReader.java +++ b/src/main/java/com/jenkov/nioserver/IMessageReader.java @@ -9,6 +9,8 @@ import java.util.List; */ public interface IMessageReader { + public void init(MessageBuffer readMessageBuffer); + public void read(Socket socket, ByteBuffer byteBuffer) throws IOException; public List getMessages(); diff --git a/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java b/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java index 0019f48..66bb80e 100644 --- a/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java +++ b/src/main/java/com/jenkov/nioserver/IMessageReaderFactory.java @@ -6,4 +6,5 @@ package com.jenkov.nioserver; public interface IMessageReaderFactory { public IMessageReader createMessageReader(); + } diff --git a/src/main/java/com/jenkov/nioserver/Server.java b/src/main/java/com/jenkov/nioserver/Server.java index f41ef80..7bf8aac 100644 --- a/src/main/java/com/jenkov/nioserver/Server.java +++ b/src/main/java/com/jenkov/nioserver/Server.java @@ -1,54 +1,46 @@ package com.jenkov.nioserver; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; /** - * Created by jjenkov on 19-10-2015. + * Created by jjenkov on 24-10-2015. */ -public class Server implements Runnable{ +public class Server { + + private SocketAccepter socketAccepter = null; + private SocketProcessor socketProcessor = null; private int tcpPort = 0; - private ServerSocketChannel serverSocket = null; + private IMessageReaderFactory messageReaderFactory = null; + private IMessageProcessor messageProcessor = null; - private Queue socketQueue = null; + public Server(int tcpPort, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) { + this.tcpPort = tcpPort; + this.messageReaderFactory = messageReaderFactory; + this.messageProcessor = messageProcessor; + } - public Server(int tcpPort, Queue socketQueue) { - this.tcpPort = tcpPort; - this.socketQueue = socketQueue; + public void start() throws IOException { + + Queue socketQueue = new ArrayBlockingQueue(1024); //move 1024 to ServerConfig + + this.socketAccepter = new SocketAccepter(tcpPort, socketQueue); + + + MessageBuffer readBuffer = new MessageBuffer(); + MessageBuffer writeBuffer = new MessageBuffer(); + + this.socketProcessor = new SocketProcessor(socketQueue, readBuffer, writeBuffer, this.messageReaderFactory, this.messageProcessor); + + Thread accepterThread = new Thread(this.socketAccepter); + Thread processorThread = new Thread(this.socketProcessor); + + accepterThread.start(); + processorThread.start(); } - - public void run() { - try{ - this.serverSocket = ServerSocketChannel.open(); - this.serverSocket.bind(new InetSocketAddress(tcpPort)); - } catch(IOException e){ - e.printStackTrace(); - return; - } - - - while(true){ - try{ - SocketChannel socketChannel = this.serverSocket.accept(); - - System.out.println("Socket accepted"); - - //todo check if the queue can even accept more sockets. - this.socketQueue.add(new Socket(socketChannel)); - - - } catch(IOException e){ - e.printStackTrace(); - } - - } - - } } diff --git a/src/main/java/com/jenkov/nioserver/Socket.java b/src/main/java/com/jenkov/nioserver/Socket.java index 6d33943..298e7bb 100644 --- a/src/main/java/com/jenkov/nioserver/Socket.java +++ b/src/main/java/com/jenkov/nioserver/Socket.java @@ -17,7 +17,6 @@ public class Socket { public boolean endOfStreamReached = false; - public Socket() { } @@ -40,7 +39,6 @@ public class Socket { return totalBytesRead; } - public int write(ByteBuffer byteBuffer) throws IOException{ int bytesWritten = this.socketChannel.write(byteBuffer); int totalBytesWritten = bytesWritten; diff --git a/src/main/java/com/jenkov/nioserver/SocketAccepter.java b/src/main/java/com/jenkov/nioserver/SocketAccepter.java new file mode 100644 index 0000000..138a39f --- /dev/null +++ b/src/main/java/com/jenkov/nioserver/SocketAccepter.java @@ -0,0 +1,53 @@ +package com.jenkov.nioserver; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Queue; + +/** + * Created by jjenkov on 19-10-2015. + */ +public class SocketAccepter implements Runnable{ + + private int tcpPort = 0; + private ServerSocketChannel serverSocket = null; + + private Queue socketQueue = null; + + public SocketAccepter(int tcpPort, Queue socketQueue) { + this.tcpPort = tcpPort; + this.socketQueue = socketQueue; + } + + + + public void run() { + try{ + this.serverSocket = ServerSocketChannel.open(); + this.serverSocket.bind(new InetSocketAddress(tcpPort)); + } catch(IOException e){ + e.printStackTrace(); + return; + } + + + while(true){ + try{ + SocketChannel socketChannel = this.serverSocket.accept(); + + System.out.println("Socket accepted: " + socketChannel); + + //todo check if the queue can even accept more sockets. + this.socketQueue.add(new Socket(socketChannel)); + + } catch(IOException e){ + e.printStackTrace(); + } + + } + + } +} diff --git a/src/main/java/com/jenkov/nioserver/ServerCore.java b/src/main/java/com/jenkov/nioserver/SocketProcessor.java similarity index 95% rename from src/main/java/com/jenkov/nioserver/ServerCore.java rename to src/main/java/com/jenkov/nioserver/SocketProcessor.java index 268c883..251fee6 100644 --- a/src/main/java/com/jenkov/nioserver/ServerCore.java +++ b/src/main/java/com/jenkov/nioserver/SocketProcessor.java @@ -10,7 +10,7 @@ import java.util.*; /** * Created by jjenkov on 16-10-2015. */ -public class ServerCore implements Runnable { +public class SocketProcessor implements Runnable { private Queue inboundSocketQueue = null; @@ -37,7 +37,7 @@ public class ServerCore implements Runnable { private Set nonEmptyToEmptySockets = new HashSet<>(); - public ServerCore(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { + public SocketProcessor(Queue inboundSocketQueue, MessageBuffer readMessageBuffer, MessageBuffer writeMessageBuffer, IMessageReaderFactory messageReaderFactory, IMessageProcessor messageProcessor) throws IOException { this.inboundSocketQueue = inboundSocketQueue; this.readMessageBuffer = readMessageBuffer; @@ -84,6 +84,8 @@ public class ServerCore implements Runnable { newSocket.socketChannel.configureBlocking(false); newSocket.messageReader = this.messageReaderFactory.createMessageReader(); + newSocket.messageReader.init(this.readMessageBuffer); + newSocket.messageWriter = new MessageWriter(); this.socketMap.put(newSocket.socketId, newSocket); diff --git a/src/main/java/com/jenkov/nioserver/example/Main.java b/src/main/java/com/jenkov/nioserver/example/Main.java index 8182fa6..238c8ea 100644 --- a/src/main/java/com/jenkov/nioserver/example/Main.java +++ b/src/main/java/com/jenkov/nioserver/example/Main.java @@ -13,16 +13,6 @@ import java.util.concurrent.BlockingQueue; public class Main { public static void main(String[] args) throws IOException { - BlockingQueue inboundSocketQueue = new ArrayBlockingQueue(1024); - - Server server = new Server(9999, inboundSocketQueue); - - Thread serverThread = new Thread(server); - serverThread.start(); - - MessageBuffer readMessageBuffer = new MessageBuffer(); - MessageBuffer writeMessageBuffer = new MessageBuffer(); - IMessageReaderFactory messageReaderFactory = new HttpMessageReaderFactory(readMessageBuffer); String httpResponse = "HTTP/1.1 200 OK\r\n" + "Content-Length: 38\r\n" + @@ -42,9 +32,10 @@ public class Main { writeProxy.enqueue(response); }; - ServerCore serverCore = new ServerCore(inboundSocketQueue, readMessageBuffer, writeMessageBuffer, messageReaderFactory, messageProcessor); - Thread serverCoreThread = new Thread(serverCore); - serverCoreThread.start(); + Server server = new Server(9999, new HttpMessageReaderFactory(), messageProcessor); + + server.start(); + } diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java index 41ce706..cf3a6e1 100644 --- a/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageReader.java @@ -20,8 +20,12 @@ public class HttpMessageReader implements IMessageReader { private List completeMessages = new ArrayList(); private Message nextMessage = null; - public HttpMessageReader(MessageBuffer messageBuffer) { - this.messageBuffer = messageBuffer; + public HttpMessageReader() { + } + + @Override + public void init(MessageBuffer readMessageBuffer) { + this.messageBuffer = readMessageBuffer; this.nextMessage = messageBuffer.getMessage(); this.nextMessage.metaData = new HttpHeaders(); } diff --git a/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java b/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java index e2fe3da..e855aed 100644 --- a/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java +++ b/src/main/java/com/jenkov/nioserver/http/HttpMessageReaderFactory.java @@ -9,14 +9,11 @@ import com.jenkov.nioserver.MessageBuffer; */ public class HttpMessageReaderFactory implements IMessageReaderFactory { - private MessageBuffer messageBuffer = null; - - public HttpMessageReaderFactory(MessageBuffer messageBuffer) { - this.messageBuffer = messageBuffer; + public HttpMessageReaderFactory() { } @Override public IMessageReader createMessageReader() { - return new HttpMessageReader(this.messageBuffer); + return new HttpMessageReader(); } }