Added contact initialization to handshake and ReceivedMessageProcessor

This commit is contained in:
Kai S. K. Engelbart 2020-01-02 17:11:41 +02:00
parent 96066863ca
commit 5bbf8c3503
4 changed files with 63 additions and 170 deletions

View File

@ -7,11 +7,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import javax.naming.TimeLimitExceededException;
import envoy.client.util.EnvoyLog;
import envoy.data.LoginCredentials;
import envoy.data.Message;
import envoy.data.User;
import envoy.exception.EnvoyException;
import envoy.data.*;
import envoy.util.SerializationUtils;
/**
@ -33,12 +32,17 @@ public class Client implements Closeable {
private volatile User sender;
private User recipient;
private volatile Contacts contacts;
private Config config = Config.getInstance();
private static final Logger logger = EnvoyLog.getLogger(Client.class.getSimpleName());
/**
* Enters the online mode by acquiring a user ID from the server.
* Enters the online mode by acquiring a user ID from the server. As a
* connection has to be established and a handshake has to be made, this method
* will block for up to 5 seconds. If the handshake does exceed this time limit,
* an exception is thrown.
*
* @param credentials the login credentials of the user
* @throws Exception if the online mode could not be entered or the request
@ -56,6 +60,7 @@ public class Client implements Closeable {
// Register user creation processor
receiver.registerProcessor(User.class, sender -> { logger.info("Acquired user object " + sender); this.sender = sender; });
receiver.registerProcessor(Contacts.class, contacts -> { logger.info("Acquired contacts object " + contacts); this.contacts = contacts; });
// Start receiver
new Thread(receiver).start();
@ -66,12 +71,19 @@ public class Client implements Closeable {
// Wait for a maximum of five seconds to acquire the sender object
long start = System.currentTimeMillis();
while (sender == null) {
if (System.currentTimeMillis() - start > 5000) throw new EnvoyException("Did not log in after 5 seconds");
while (sender == null || contacts == null) {
if (System.currentTimeMillis() - start > 5000) throw new TimeLimitExceededException("Did not log in after 5 seconds");
Thread.sleep(500);
}
logger.info("Handshake completed.");
online = true;
// Remove user creation processor
receiver.removeAllProcessors();
// Register processors for message and status handling
receiver.registerProcessor(Message.class, new ReceivedMessageProcessor());
}
/**
@ -84,6 +96,7 @@ public class Client implements Closeable {
public void sendMessage(Message message) throws IOException {
checkOnline();
SerializationUtils.writeBytesWithLength(message, socket.getOutputStream());
message.nextStatus();
}
/**
@ -94,7 +107,7 @@ public class Client implements Closeable {
public Map<String, User> getUsers() {
checkOnline();
Map<String, User> users = new HashMap<>();
sender.getContacts().forEach(u -> users.put(u.getName(), u));
contacts.getContacts().forEach(u -> users.put(u.getName(), u));
return users;
}

View File

@ -84,166 +84,6 @@ public class LocalDB {
*/
public void loadChats() throws ClassNotFoundException, IOException { chats = SerializationUtils.read(localDBFile, ArrayList.class); }
// /**
// * Creates a {@link Sync} object filled with the changes that occurred to the
// * local database since the last synchronization.
// *
// * @param userId the ID of the user that is synchronized by this client
// * @return {@link Sync} object filled with the current changes
// * @since Envoy v0.1-alpha
// */
// public Sync fillSync(long userId) {
// addWaitingMessagesToSync();
//
// sync.getMessages().addAll(readMessages.getMessages());
// readMessages.getMessages().clear();
//
// logger.finest(String.format("Filled sync with %d messages.",
// sync.getMessages().size()));
// return sync;
// }
//
// /**
// * Applies the changes carried by a {@link Sync} object to the local database
// *
// * @param returnSync the {@link Sync} object to apply
// * @since Envoy v0.1-alpha
// */
// public void applySync(Sync returnSync) {
// for (int i = 0; i < returnSync.getMessages().size(); i++) {
//
// // The message has an ID
// if (returnSync.getMessages().get(i).getMetadata().getMessageId() != 0) {
//
// // Messages are processes differently corresponding to their state
// switch (returnSync.getMessages().get(i).getMetadata().getState()) {
// case SENT:
// // Update previously waiting and now sent messages that were assigned an ID
// by
// // the server
// sync.getMessages().get(i).getMetadata().setMessageId(returnSync.getMessages().get(i).getMetadata().getMessageId());
// sync.getMessages().get(i).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
// break;
// case RECEIVED:
// if (returnSync.getMessages().get(i).getMetadata().getSender() != 0) {
// // these are the unread Messages from the server
// unreadMessagesSync.getMessages().add(returnSync.getMessages().get(i));
//
// // Create and dispatch message creation event
// EventBus.getInstance().dispatch(new
// MessageCreationEvent(returnSync.getMessages().get(i)));
// } else {
// // Update Messages in localDB to state RECEIVED
// for (Chat chat : getChats())
// if (chat.getRecipient().getId() ==
// returnSync.getMessages().get(i).getMetadata().getRecipient())
// for (int j = 0; j < chat.getModel().getSize(); j++)
// if (chat.getModel().get(j).getMetadata().getMessageId() ==
// returnSync.getMessages()
// .get(i)
// .getMetadata()
// .getMessageId())
// chat.getModel().get(j).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
// }
// break;
// case READ:
// // Update local Messages to state READ
// logger.info("Message with ID: " +
// returnSync.getMessages().get(i).getMetadata().getMessageId()
// + "was initialized to be set to READ in localDB.");
// for (Chat chat : getChats())
// if (chat.getRecipient().getId() ==
// returnSync.getMessages().get(i).getMetadata().getRecipient()) {
// logger.info("Chat with: " + chat.getRecipient().getId() + "was selected.");
// for (int k = 0; k < chat.getModel().getSize(); k++)
// if (chat.getModel().get(k).getMetadata().getMessageId() ==
// returnSync.getMessages()
// .get(i)
// .getMetadata()
// .getMessageId()) {
// logger.info("Message with ID: " +
// chat.getModel().get(k).getMetadata().getMessageId() + "was selected.");
// chat.getModel().get(k).getMetadata().setState(returnSync.getMessages().get(i).getMetadata().getState());
// logger.info("Message State is now: " +
// chat.getModel().get(k).getMetadata().getState());
// }
// }
// break;
// }
// }
// }
//
// // Updating UserStatus of all users in LocalDB
// for (User user : returnSync.getUsers())
// for (Chat chat : getChats())
// if (user.getId() == chat.getRecipient().getId())
// chat.getRecipient().setStatus(user.getStatus());
//
// sync.getMessages().clear();
// sync.getUsers().clear();
// }
//
// /**
// * Adds the unread messages returned from the server in the latest sync to the
// * right chats in the LocalDB.
// *
// * @since Envoy v0.1-alpha
// */
// public void addUnreadMessagesToLocalDB() {
// for (Message message : unreadMessagesSync.getMessages())
// for (Chat chat : getChats())
// if (message.getMetadata().getSender() == chat.getRecipient().getId()) {
// chat.appendMessage(message);
// break;
// }
// }
//
// /**
// * Changes all messages with state {@code RECEIVED} of a specific chat to
// state
// * {@code READ}.
// * <br>
// * Adds these messages to the {@code readMessages} {@link Sync} object.
// *
// * @param currentChat the {@link Chat} that was just opened
// * @since Envoy v0.1-alpha
// */
// public void setMessagesToRead(Chat currentChat) {
// for (int i = currentChat.getModel().size() - 1; i >= 0; --i)
// if (currentChat.getModel().get(i).getMetadata().getRecipient() !=
// currentChat.getRecipient().getId())
// if (currentChat.getModel().get(i).getMetadata().getState() ==
// MessageState.RECEIVED) {
// currentChat.getModel().get(i).getMetadata().setState(MessageState.READ);
// readMessages.getMessages().add(currentChat.getModel().get(i));
// } else break;
// }
//
// /**
// * Adds all messages with state {@code WAITING} from the {@link LocalDB} to
// the
// * {@link Sync} object.
// *
// * @since Envoy v0.1-alpha
// */
// private void addWaitingMessagesToSync() {
// for (Chat chat : getChats())
// for (int i = 0; i < chat.getModel().size(); i++)
// if (chat.getModel().get(i).getMetadata().getState() == MessageState.WAITING)
// {
// logger.info("Got Waiting Message");
// sync.getMessages().add(chat.getModel().get(i));
// }
// }
//
// /**
// * Clears the {@code unreadMessagesSync} {@link Sync} object.
// *
// * @since Envoy v0.1-alpha
// */
// public void clearUnreadMessagesSync() {
// unreadMessagesSync.getMessages().clear(); }
/**
* @return a {@code Map<String, User>} of all users stored locally with their
* user names as keys

View File

@ -0,0 +1,32 @@
package envoy.client;
import java.util.function.Consumer;
import java.util.logging.Logger;
import envoy.client.event.MessageCreationEvent;
import envoy.client.util.EnvoyLog;
import envoy.data.Message;
import envoy.data.Message.MessageStatus;
import envoy.event.EventBus;
/**
* Project: <strong>envoy-client</strong><br>
* File: <strong>ReceivedMessageProcessor.java</strong><br>
* Created: <strong>31.12.2019</strong><br>
*
* @author Kai S. K. Engelbart
* @since Envoy v0.3-alpha
*/
public class ReceivedMessageProcessor implements Consumer<Message> {
private static final Logger logger = EnvoyLog.getLogger(ReceivedMessageProcessor.class.getSimpleName());
@Override
public void accept(Message message) {
logger.info("Received message object " + message);
if (message.getStatus() != MessageStatus.SENT) logger.warning("The message has the unexpected status " + message.getStatus());
else
// Dispatch event
EventBus.getInstance().dispatch(new MessageCreationEvent(message));
}
}

View File

@ -2,6 +2,7 @@ package envoy.client;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
@ -20,8 +21,8 @@ import envoy.client.util.EnvoyLog;
*/
public class Receiver implements Runnable {
private InputStream in;
private Map<Class<?>, Consumer<?>> processors = new HashMap<>();
private final InputStream in;
private final Map<Class<?>, Consumer<?>> processors = new HashMap<>();
private static final Logger logger = EnvoyLog.getLogger(Receiver.class.getSimpleName());
@ -47,6 +48,8 @@ public class Receiver implements Runnable {
logger.severe(String.format("The received object has the class %s for which no processor is defined.", obj.getClass()));
else processor.accept(obj);
}
} catch (SocketException e) {
logger.info("Connection probably closed by client. Exiting receiver thread...");
} catch (Exception e) {
logger.log(Level.SEVERE, "Error on receiver thread", e);
}
@ -60,4 +63,9 @@ public class Receiver implements Runnable {
* @param processor the object processor
*/
public <T> void registerProcessor(Class<T> processorClass, Consumer<T> processor) { processors.put(processorClass, processor); }
/**
* Removes all object processors registered at this {@link Receiver}.
*/
public void removeAllProcessors() { processors.clear(); }
}