This repository has been archived on 2021-12-05. You can view files and clone it, but cannot push or open issues or pull requests.
envoy/src/main/java/envoy/client/net/Client.java

298 lines
11 KiB
Java
Raw Normal View History

package envoy.client.net;
2019-10-06 10:45:19 +02:00
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
2020-06-13 22:36:52 +02:00
import java.util.logging.Level;
import java.util.logging.Logger;
import envoy.client.data.Cache;
import envoy.client.data.ClientConfig;
import envoy.client.data.LocalDB;
2020-02-11 18:15:15 +01:00
import envoy.client.event.SendEvent;
import envoy.data.*;
import envoy.event.*;
import envoy.event.contact.ContactOperation;
import envoy.event.contact.ContactSearchResult;
import envoy.util.EnvoyLog;
import envoy.util.SerializationUtils;
2019-10-06 10:45:19 +02:00
/**
* Establishes a connection to the server, performs a handshake and delivers
* certain objects to the server.<br>
* <br>
2019-10-06 10:45:19 +02:00
* Project: <strong>envoy-client</strong><br>
* File: <strong>Client.java</strong><br>
2019-10-06 10:45:19 +02:00
* Created: <strong>28 Sep 2019</strong><br>
*
* @author Kai S. K. Engelbart
* @author Maximilian K&auml;fer
* @author Leon Hofmeister
* @since Envoy Client v0.1-alpha
2019-10-06 10:45:19 +02:00
*/
public class Client implements Closeable {
// Connection handling
private Socket socket;
private Receiver receiver;
private boolean online;
2019-10-06 10:45:19 +02:00
// Asynchronously initialized during handshake
private volatile User sender;
private volatile Set<? extends Contact> contacts;
private volatile boolean rejected;
2019-10-06 10:45:19 +02:00
// Configuration, logging and event management
private static final ClientConfig config = ClientConfig.getInstance();
private static final Logger logger = EnvoyLog.getLogger(Client.class);
private static final EventBus eventBus = EventBus.getInstance();
/**
* Enters the online mode by acquiring a user ID from the server. As a
* connection has to be established and a handshake has to be made, this method
* will block for up to 5 seconds. If the handshake does exceed this time limit,
* an exception is thrown.
*
* @param credentials the login credentials of the user
* @param receivedMessageCache a message cache containing all unread
* messages
* from the server that can be relayed
* after
* initialization
* @param receivedMessageStatusChangeCache an event cache containing all
* received messageStatusChangeEvents
* from the server that can be relayed
* after initialization
* @throws TimeoutException if the server could not be reached
2020-06-13 22:36:52 +02:00
* @throws IOException if the login credentials could not be written
* @throws InterruptedException if the current thread is interrupted while
* waiting for the handshake response
*/
public void performHandshake(LoginCredentials credentials, Cache<Message> receivedMessageCache,
Cache<MessageStatusChange> receivedMessageStatusChangeCache)
throws TimeoutException, IOException, InterruptedException {
if (online) throw new IllegalStateException("Handshake has already been performed successfully");
// Establish TCP connection
2020-06-13 22:36:52 +02:00
logger.log(Level.FINER, String.format("Attempting connection to server %s:%d...", config.getServer(), config.getPort()));
socket = new Socket(config.getServer(), config.getPort());
2020-06-13 22:36:52 +02:00
logger.log(Level.FINE, "Successfully established TCP connection to server");
2020-06-09 15:41:01 +02:00
// Create object receiver
receiver = new Receiver(socket.getInputStream());
// Register user creation processor, contact list processor and message cache
receiver.registerProcessor(User.class, sender -> { this.sender = sender; contacts = sender.getContacts(); });
receiver.registerProcessor(Message.class, receivedMessageCache);
receiver.registerProcessor(MessageStatusChange.class, receivedMessageStatusChangeCache);
receiver.registerProcessor(HandshakeRejection.class, evt -> { rejected = true; eventBus.dispatch(evt); });
rejected = false;
// Start receiver
receiver.start();
// Write login credentials
SerializationUtils.writeBytesWithLength(credentials, socket.getOutputStream());
// Wait for a maximum of five seconds to acquire the sender object
final long start = System.currentTimeMillis();
while (sender == null) {
// Quit immediately after handshake rejection
// This method can then be called again
if (rejected) {
socket.close();
receiver.removeAllProcessors();
return;
}
if (System.currentTimeMillis() - start > 5000) throw new TimeoutException("Did not log in after 5 seconds");
Thread.sleep(500);
}
online = true;
2020-06-09 15:41:01 +02:00
// Remove all processors as they are only used during the handshake
receiver.removeAllProcessors();
2020-06-13 22:36:52 +02:00
logger.log(Level.INFO, "Handshake completed.");
}
/**
* Initializes the {@link Receiver} used to process data sent from the server to
* this client.
*
* @param localDB the local database used to persist
* the current
* {@link IDGenerator}
* @param receivedMessageCache a message cache containing all unread
* messages
* from the server that can be relayed
* after
* initialization
* @param receivedMessageStatusChangeCache an event cache containing all
* received messageStatusChangeEvents
* from the server that can be relayed
* after initialization
* @throws IOException if no {@link IDGenerator} is present and none could be
* requested from the server
* @since Envoy Client v0.2-alpha
*/
public void initReceiver(LocalDB localDB, Cache<Message> receivedMessageCache,
Cache<MessageStatusChange> receivedMessageStatusChangeCache) throws IOException {
checkOnline();
// Process incoming messages
2020-06-13 22:36:52 +02:00
final ReceivedMessageProcessor receivedMessageProcessor = new ReceivedMessageProcessor();
final MessageStatusChangeProcessor messageStatusChangeEventProcessor = new MessageStatusChangeProcessor();
receiver.registerProcessor(Message.class, receivedMessageProcessor);
// Relay cached unread messages
receivedMessageCache.setProcessor(receivedMessageProcessor);
// Process message status changes
receiver.registerProcessor(MessageStatusChange.class, messageStatusChangeEventProcessor);
receivedMessageStatusChangeCache.setProcessor(messageStatusChangeEventProcessor);
// Process user status changes
receiver.registerProcessor(UserStatusChange.class, eventBus::dispatch);
// Process message ID generation
receiver.registerProcessor(IDGenerator.class, localDB::setIDGenerator);
// Process name changes
receiver.registerProcessor(NameChange.class, evt -> { localDB.replaceContactName(evt); eventBus.dispatch(evt); });
// Process contact searches
receiver.registerProcessor(ContactSearchResult.class, eventBus::dispatch);
2020-06-09 17:06:40 +02:00
// Process contact operations
receiver.registerProcessor(ContactOperation.class, eventBus::dispatch);
2020-02-11 18:15:15 +01:00
// Process group size changes
receiver.registerProcessor(GroupResize.class, evt -> { localDB.updateGroup(evt); eventBus.dispatch(evt); });
2020-02-11 18:15:15 +01:00
// Send event
eventBus.register(SendEvent.class, evt -> {
2020-02-11 18:15:15 +01:00
try {
sendEvent(evt.get());
} catch (final IOException e) {
2020-02-11 18:15:15 +01:00
e.printStackTrace();
2020-06-13 22:36:52 +02:00
logger.log(Level.WARNING, "An error occurred when trying to send Event " + evt, e);
2020-02-11 18:15:15 +01:00
}
});
// Request a generator if none is present or the existing one is consumed
if (!localDB.hasIDGenerator() || !localDB.getIDGenerator().hasNext()) requestIdGenerator();
}
2019-11-09 13:25:18 +01:00
/**
* Creates a new write proxy that uses this client to communicate with the
* server.
*
* @param localDB the local database that the write proxy will use to access
* caches
* @return a new write proxy
* @since Envoy Client v0.3-alpha
*/
public WriteProxy createWriteProxy(LocalDB localDB) { return new WriteProxy(this, localDB); }
/**
* Sends a message to the server. The message's status will be incremented once
* it was delivered successfully.
*
* @param message the message to send
* @throws IOException if the message does not reach the server
* @since Envoy Client v0.3-alpha
*/
public void sendMessage(Message message) throws IOException {
writeObject(message);
message.nextStatus();
}
/**
* Sends an event to the server.
*
* @param evt the event to send
* @throws IOException if the event did not reach the server
*/
public void sendEvent(Event<?> evt) throws IOException { writeObject(evt); }
/**
* Requests a new {@link IDGenerator} from the server.
*
* @throws IOException if the request does not reach the server
* @since Envoy Client v0.3-alpha
*/
public void requestIdGenerator() throws IOException {
2020-06-13 22:36:52 +02:00
logger.log(Level.INFO, "Requesting new id generator...");
writeObject(new IDGeneratorRequest());
}
/**
* @return a {@code Map<String, User>} of all users on the server with their
* user names as keys
* @since Envoy Client v0.2-alpha
*/
public Map<String, Contact> getUsers() {
checkOnline();
final Map<String, Contact> users = new HashMap<>();
contacts.forEach(u -> users.put(u.getName(), u));
users.put(sender.getName(), sender);
return users;
}
@Override
public void close() throws IOException { if (online) socket.close(); }
private void writeObject(Object obj) throws IOException {
checkOnline();
2020-06-13 22:36:52 +02:00
logger.log(Level.FINE, "Sending " + obj);
SerializationUtils.writeBytesWithLength(obj, socket.getOutputStream());
}
private void checkOnline() { if (!online) throw new IllegalStateException("Client is not online"); }
/**
* @return the sender object that represents this client.
* @since Envoy Client v0.1-alpha
*/
public User getSender() { return sender; }
/**
* Sets the client user which is used to send messages.
*
* @param clientUser the client user to set
* @since Envoy Client v0.2-alpha
*/
2020-06-13 22:36:52 +02:00
public void setSender(User clientUser) { sender = clientUser; }
/**
* @return the {@link Receiver} used by this {@link Client}
*/
public Receiver getReceiver() { return receiver; }
/**
* @return {@code true} if a connection to the server could be established
* @since Envoy Client v0.2-alpha
*/
public boolean isOnline() { return online; }
/**
* @return the contacts of this {@link Client}
* @since Envoy Client v0.3-alpha
*/
public Set<? extends Contact> getContacts() { return contacts; }
/**
* @param contacts the contacts to set
* @since Envoy Client v0.3-alpha
*/
public void setContacts(Set<? extends Contact> contacts) { this.contacts = contacts; }
}