package envoy.client.net; import java.io.Closeable; import java.io.IOException; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; import envoy.client.data.Cache; import envoy.client.data.ClientConfig; import envoy.client.data.LocalDB; import envoy.client.event.SendEvent; import envoy.data.*; import envoy.event.*; import envoy.event.contact.ContactOperationEvent; import envoy.event.contact.ContactSearchResult; import envoy.util.EnvoyLog; import envoy.util.SerializationUtils; /** * Establishes a connection to the server, performs a handshake and delivers * certain objects to the server.
*
* Project: envoy-client
* File: Client.java
* Created: 28 Sep 2019
* * @author Kai S. K. Engelbart * @author Maximilian Käfer * @author Leon Hofmeister * @since Envoy Client v0.1-alpha */ public class Client implements Closeable { // Connection handling private Socket socket; private Receiver receiver; private boolean online; // Asynchronously initialized during handshake private volatile User sender; private volatile Set contacts; private volatile boolean rejected; // Configuration, logging and event management private static final ClientConfig config = ClientConfig.getInstance(); private static final Logger logger = EnvoyLog.getLogger(Client.class); private static final EventBus eventBus = EventBus.getInstance(); /** * Enters the online mode by acquiring a user ID from the server. As a * connection has to be established and a handshake has to be made, this method * will block for up to 5 seconds. If the handshake does exceed this time limit, * an exception is thrown. * * @param credentials the login credentials of the user * @param receivedMessageCache a message cache containing all unread messages * from the server that can be relayed after * initialization * @throws TimeoutException if the server could not be reached * @throws IOException if the login credentials could not be * written * @throws InterruptedException if the current thread is interrupted while * waiting for the handshake response */ public void performHandshake(LoginCredentials credentials, Cache receivedMessageCache) throws TimeoutException, IOException, InterruptedException { if (online) throw new IllegalStateException("Handshake has already been performed successfully"); // Establish TCP connection logger.finer(String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort())); socket = new Socket(config.getServer(), config.getPort()); logger.fine("Successfully established TCP connection to server"); // Create message receiver receiver = new Receiver(socket.getInputStream()); // Register user creation processor, contact list processor and message cache receiver.registerProcessor(User.class, sender -> { this.sender = sender; contacts = sender.getContacts(); }); receiver.registerProcessor(Message.class, receivedMessageCache); receiver.registerProcessor(HandshakeRejectionEvent.class, evt -> { rejected = true; eventBus.dispatch(evt); }); rejected = false; // Start receiver receiver.start(); // Write login credentials SerializationUtils.writeBytesWithLength(credentials, socket.getOutputStream()); // Wait for a maximum of five seconds to acquire the sender object long start = System.currentTimeMillis(); while (sender == null) { // Quit immediately after handshake rejection // This method can then be called again if (rejected) { socket.close(); receiver.removeAllProcessors(); return; } if (System.currentTimeMillis() - start > 5000) throw new TimeoutException("Did not log in after 5 seconds"); Thread.sleep(500); } online = true; // Remove user creation processor receiver.removeAllProcessors(); logger.info("Handshake completed."); } /** * Initializes the {@link Receiver} used to process data sent from the server to * this client. * * @param localDB the local database used to persist the current * {@link IDGenerator} * @param receivedMessageCache a message cache containing all unread messages * from the server that can be relayed after * initialization * @throws IOException if no {@link IDGenerator} is present and none could be * requested from the server * @since Envoy Client v0.2-alpha */ public void initReceiver(LocalDB localDB, Cache receivedMessageCache) throws IOException { checkOnline(); // Process incoming messages final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor(); receiver.registerProcessor(Message.class, receivedMessageProcessor); // Relay cached unread messages receivedMessageCache.setProcessor(receivedMessageProcessor); // Process message status changes receiver.registerProcessor(MessageStatusChangeEvent.class, new MessageStatusChangeEventProcessor()); // Process user status changes receiver.registerProcessor(UserStatusChangeEvent.class, new UserStatusChangeProcessor(localDB)); // Process message ID generation receiver.registerProcessor(IDGenerator.class, localDB::setIDGenerator); // Process name changes receiver.registerProcessor(NameChangeEvent.class, evt -> { localDB.replaceContactName(evt); eventBus.dispatch(evt); }); // Process contact searches receiver.registerProcessor(ContactSearchResult.class, eventBus::dispatch); receiver.registerProcessor(Contact.class, contacts -> eventBus.dispatch(new ContactOperationEvent(contacts.getContacts().iterator().next(), ElementOperation.ADD))); // Process group size changes receiver.registerProcessor(GroupResizeEvent.class, evt -> { localDB.updateGroup(evt); eventBus.dispatch(evt); }); // Send event eventBus.register(SendEvent.class, evt -> { try { sendEvent(evt.get()); } catch (IOException e) { e.printStackTrace(); } }); // Request a generator if none is present or the existing one is consumed if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext()) requestIdGenerator(); } /** * Creates a new write proxy that uses this client to communicate with the * server. * * @param localDB the local database that the write proxy will use to access * caches * @return a new write proxy * @since Envoy Client v0.3-alpha */ public WriteProxy createWriteProxy(LocalDB localDB) { return new WriteProxy(this, localDB); } /** * Sends a message to the server. The message's status will be incremented once * it was delivered successfully. * * @param message the message to send * @throws IOException if the message does not reach the server * @since Envoy Client v0.3-alpha */ public void sendMessage(Message message) throws IOException { writeObject(message); message.nextStatus(); } /** * Sends an event to the server. * * @param evt the event to send * @throws IOException if the event did not reach the server */ public void sendEvent(Event evt) throws IOException { writeObject(evt); } /** * Requests a new {@link IDGenerator} from the server. * * @throws IOException if the request does not reach the server * @since Envoy Client v0.3-alpha */ public void requestIdGenerator() throws IOException { logger.info("Requesting new id generator..."); writeObject(new IDGeneratorRequest()); } /** * @return a {@code Map} of all users on the server with their * user names as keys * @since Envoy Client v0.2-alpha */ public Map getUsers() { checkOnline(); Map users = new HashMap<>(); contacts.forEach(u -> users.put(u.getName(), u)); users.put(sender.getName(), sender); return users; } @Override public void close() throws IOException { if (online) socket.close(); } private void writeObject(Object obj) throws IOException { checkOnline(); logger.fine("Sending " + obj); SerializationUtils.writeBytesWithLength(obj, socket.getOutputStream()); } private void checkOnline() { if (!online) throw new IllegalStateException("Client is not online"); } /** * @return the sender object that represents this client. * @since Envoy Client v0.1-alpha */ public User getSender() { return sender; } /** * Sets the client user which is used to send messages. * * @param clientUser the client user to set * @since Envoy Client v0.2-alpha */ public void setSender(User clientUser) { this.sender = clientUser; } /** * @return the {@link Receiver} used by this {@link Client} */ public Receiver getReceiver() { return receiver; } /** * @return {@code true} if a connection to the server could be established * @since Envoy Client v0.2-alpha */ public boolean isOnline() { return online; } /** * @return the contacts of this {@link Client} * @since Envoy Client v0.3-alpha */ public Set getContacts() { return contacts; } /** * @param contacts the contacts to set * @since Envoy Client v0.3-alpha */ public void setContacts(Set contacts) { this.contacts = contacts; } }