Merge pull request #46 from informatik-ag-ngl/f/groupMessages

Group Messages
This commit is contained in:
Kai S. K. Engelbart 2020-07-08 19:16:45 +00:00 committed by GitHub
commit 96e413c0b4
17 changed files with 287 additions and 153 deletions

View File

@ -62,8 +62,10 @@ public class Startup {
final Server server = new Server(8080, ObjectMessageReader::new, final Server server = new Server(8080, ObjectMessageReader::new,
new ObjectMessageProcessor(Set.of(new LoginCredentialProcessor(), new ObjectMessageProcessor(Set.of(new LoginCredentialProcessor(),
new MessageProcessor(), new MessageProcessor(),
new GroupMessageProcessor(),
new GroupCreationProcessor(), new GroupCreationProcessor(),
new MessageStatusChangeProcessor(), new MessageStatusChangeProcessor(),
new GroupMessageStatusChangeProcessor(),
new UserStatusChangeProcessor(), new UserStatusChangeProcessor(),
new IDGeneratorRequestProcessor(), new IDGeneratorRequestProcessor(),
new ContactSearchProcessor(), new ContactSearchProcessor(),

View File

@ -1,6 +1,7 @@
package envoy.server.data; package envoy.server.data;
import java.util.Date; import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import javax.persistence.*; import javax.persistence.*;
@ -16,14 +17,29 @@ import envoy.data.Group;
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
@Entity @Entity
@NamedQuery(
name = GroupMessage.getPendingGroupMsg,
query = "SELECT m FROM GroupMessage m JOIN m.memberMessageStatus s WHERE (KEY(s) = :userId) AND ((m.creationDate > :lastSeen)"
+ "OR ((m.status = envoy.data.Message$MessageStatus.RECEIVED) AND (m.receivedDate > :lastSeen))"
+ "OR ((m.status = envoy.data.Message$MessageStatus.READ) AND (m.readDate > :lastSeen))"
+ "OR ((m.lastStatusChangeDate > :lastSeen)))"
)
public class GroupMessage extends Message { public class GroupMessage extends Message {
/**
* Named query retrieving pending group messages sent to a group containing a
* specific user (parameter {@code userId}) that were sent after a certain time
* stamp (parameter {@code :lastSeen}).
*
* @since Envoy Server Standalone v0.1-beta
*/
public static final String getPendingGroupMsg = "GroupMessage.getPendingGroupMsg";
@ElementCollection @ElementCollection
private Map<Long, envoy.data.Message.MessageStatus> memberMessageStatus; private Map<Long, envoy.data.Message.MessageStatus> memberMessageStatus;
@Column(name = "last_status_change_date") @Column(name = "last_status_change_date")
@Temporal(TemporalType.TIMESTAMP) protected LocalDateTime lastStatusChangeDate;
protected Date lastStatusChangeDate;
/** /**
* The constructor for a database object. * The constructor for a database object.
@ -38,12 +54,12 @@ public class GroupMessage extends Message {
* @param groupMessage the {@link envoy.data.GroupMessage} to convert * @param groupMessage the {@link envoy.data.GroupMessage} to convert
* into a * into a
* database {@link GroupMessage} * database {@link GroupMessage}
* @param lastStatusChangeDate the {@link Date} to set * @param lastStatusChangeDate the time stamp to set
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public GroupMessage(envoy.data.GroupMessage groupMessage, Date lastStatusChangeDate) { public GroupMessage(envoy.data.GroupMessage groupMessage, LocalDateTime lastStatusChangeDate) {
super(groupMessage); super(groupMessage);
memberMessageStatus = groupMessage.getMemberStatuses(); memberMessageStatus = groupMessage.getMemberStatuses();
this.lastStatusChangeDate = lastStatusChangeDate; this.lastStatusChangeDate = lastStatusChangeDate;
} }
@ -57,7 +73,7 @@ public class GroupMessage extends Message {
*/ */
@Override @Override
public envoy.data.GroupMessage toCommon() { public envoy.data.GroupMessage toCommon() {
return prepareBuilder().buildGroupMessage((Group) recipient.toCommon(), memberMessageStatus); return prepareBuilder().buildGroupMessage((Group) recipient.toCommon(), new HashMap<>(memberMessageStatus));
} }
/** /**
@ -78,11 +94,11 @@ public class GroupMessage extends Message {
* @return the date at which one of the member statuses changed last * @return the date at which one of the member statuses changed last
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public Date getLastStatusChangeDate() { return lastStatusChangeDate; } public LocalDateTime getLastStatusChangeDate() { return lastStatusChangeDate; }
/** /**
* @param date the date to set * @param date the date to set
* @since Envoy Server Standalone v0.1-beta * @since Envoy Server Standalone v0.1-beta
*/ */
public void setLastStatusChangeDate(Date date) { lastStatusChangeDate = date; } public void setLastStatusChangeDate(LocalDateTime date) { lastStatusChangeDate = date; }
} }

View File

@ -194,6 +194,22 @@ public class PersistenceManager {
.getResultList(); .getResultList();
} }
/**
* Returns all groupMessages received while being offline or the ones that have
* changed.
*
* @param user the user who wants to receive his unread groupMessages
* @return all groupMessages that the client does not yet have (unread
* groupMessages)
* @since Envoy Server Standalone v0.1-alpha
*/
public List<GroupMessage> getPendingGroupMessages(User user) {
return entityManager.createNamedQuery(GroupMessage.getPendingGroupMsg)
.setParameter("userId", user.getID())
.setParameter("lastSeen", user.getLastSeen())
.getResultList();
}
/** /**
* Searches for users matching a search phrase. Contacts of the attached user * Searches for users matching a search phrase. Contacts of the attached user
* and the attached user is ignored. * and the attached user is ignored.

View File

@ -50,7 +50,7 @@ public class ConnectionManager implements ISocketIdListener {
public void socketCancelled(long socketID) { public void socketCancelled(long socketID) {
if (!pendingSockets.remove(socketID)) { if (!pendingSockets.remove(socketID)) {
// Notify contacts of this users offline-going // Notify contacts of this users offline-going
envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIdBySocketID(socketID)); envoy.server.data.User user = PersistenceManager.getInstance().getUserByID(getUserIDBySocketID(socketID));
user.setStatus(UserStatus.OFFLINE); user.setStatus(UserStatus.OFFLINE);
user.setLastSeen(LocalDateTime.now()); user.setLastSeen(LocalDateTime.now());
UserStatusChangeProcessor.updateUserStatus(user); UserStatusChangeProcessor.updateUserStatus(user);
@ -87,7 +87,7 @@ public class ConnectionManager implements ISocketIdListener {
* @return the userId associated with this socketId * @return the userId associated with this socketId
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha
*/ */
public long getUserIdBySocketID(long socketID) { public long getUserIDBySocketID(long socketID) {
return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey(); return sockets.entrySet().stream().filter(entry -> entry.getValue().equals(socketID)).findFirst().get().getKey();
} }

View File

@ -4,6 +4,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.util.Set; import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import com.jenkov.nioserver.IMessageProcessor; import com.jenkov.nioserver.IMessageProcessor;
@ -50,11 +51,11 @@ public class ObjectMessageProcessor implements IMessageProcessor {
logger.fine("Received " + obj); logger.fine("Received " + obj);
// Process object // Process object
processors.stream().filter(p -> p.getInputClass().isInstance(obj)).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> { processors.stream().filter(p -> p.getInputClass().equals(obj.getClass())).forEach((@SuppressWarnings("rawtypes") ObjectProcessor p) -> {
try { try {
p.process(p.getInputClass().cast(obj), message.socketId, new ObjectWriteProxy(writeProxy)); p.process(p.getInputClass().cast(obj), message.socketId, new ObjectWriteProxy(writeProxy));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); logger.log(Level.SEVERE, "Exception during processor execution: ", e);
} }
}); });
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {

View File

@ -1,11 +1,14 @@
package envoy.server.net; package envoy.server.net;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.Stream;
import com.jenkov.nioserver.Message; import com.jenkov.nioserver.Message;
import com.jenkov.nioserver.WriteProxy; import com.jenkov.nioserver.WriteProxy;
import envoy.server.data.Contact;
import envoy.util.EnvoyLog; import envoy.util.EnvoyLog;
import envoy.util.SerializationUtils; import envoy.util.SerializationUtils;
@ -23,7 +26,8 @@ public class ObjectWriteProxy {
private final WriteProxy writeProxy; private final WriteProxy writeProxy;
private static final Logger logger = EnvoyLog.getLogger(ObjectWriteProxy.class); private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(ObjectWriteProxy.class);
/** /**
* Creates an instance of {@link ObjectWriteProxy}. * Creates an instance of {@link ObjectWriteProxy}.
@ -36,23 +40,49 @@ public class ObjectWriteProxy {
/** /**
* @param recipientSocketID the socket id of the recipient * @param recipientSocketID the socket id of the recipient
* @param obj the object to return to the client * @param obj the object to return to the client
* @throws IOException if the serialization of the object failed * @throws RuntimeException if the serialization of the object failed (this is
* highly unlikely)
* @since Envoy Server Standalone v0.1-alpha * @since Envoy Server Standalone v0.1-alpha
*/ */
public void write(long recipientSocketID, Object obj) throws IOException { public void write(long recipientSocketID, Object obj) {
// Create message targeted at the client // Create message targeted at the client
final Message response = writeProxy.getMessage(); final Message response = writeProxy.getMessage();
response.socketId = recipientSocketID; response.socketId = recipientSocketID;
logger.fine("Sending " + obj); logger.fine("Sending " + obj);
// Serialize object to byte array try {
final byte[] objBytes = SerializationUtils.writeToByteArray(obj);
// Acquire object length in bytes // Serialize object to byte array
final byte[] objLen = SerializationUtils.intToBytes(objBytes.length); final byte[] objBytes = SerializationUtils.writeToByteArray(obj);
response.writeToMessage(objLen); // Acquire object length in bytes
response.writeToMessage(objBytes); final byte[] objLen = SerializationUtils.intToBytes(objBytes.length);
writeProxy.enqueue(response);
response.writeToMessage(objLen);
response.writeToMessage(objBytes);
writeProxy.enqueue(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Sends an object to all contact in a set that are online.
*
* @param contacts the contacts to send the object to
* @param message the object to send
* @since Envoy Server Standalone v0.1-beta
*/
public void writeToOnlineContacts(Set<? extends Contact> contacts, Object message) { writeToOnlineContacts(contacts.stream(), message); }
/**
* Sends an object to all contact in a set that are online.
*
* @param contacts the contacts to send the object to
* @param message the object to send
* @since Envoy Server Standalone v0.1-beta
*/
public void writeToOnlineContacts(Stream<? extends Contact> contacts, Object message) {
contacts.map(Contact::getID).filter(connectionManager::isOnline).map(connectionManager::getSocketID).forEach(id -> write(id, message));
} }
} }

View File

@ -1,6 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import envoy.event.ElementOperation; import envoy.event.ElementOperation;
@ -24,10 +23,10 @@ public class ContactOperationProcessor implements ObjectProcessor<ContactOperati
private static final Logger logger = EnvoyLog.getLogger(ContactOperationProcessor.class); private static final Logger logger = EnvoyLog.getLogger(ContactOperationProcessor.class);
@Override @Override
public void process(ContactOperation evt, long socketId, ObjectWriteProxy writeProxy) throws IOException { public void process(ContactOperation evt, long socketId, ObjectWriteProxy writeProxy) {
switch (evt.getOperationType()) { switch (evt.getOperationType()) {
case ADD: case ADD:
final long userID = ConnectionManager.getInstance().getUserIdBySocketID(socketId); final long userID = ConnectionManager.getInstance().getUserIDBySocketID(socketId);
final long contactId = evt.get().getID(); final long contactId = evt.get().getID();
logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID)); logger.fine(String.format("Adding user %s to the contact list of user %d.%n", evt.get(), userID));
@ -39,7 +38,7 @@ public class ContactOperationProcessor implements ObjectProcessor<ContactOperati
new ContactOperation(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD)); new ContactOperation(PersistenceManager.getInstance().getUserByID(userID).toCommon(), ElementOperation.ADD));
break; break;
default: default:
logger.warning(String.format("Received %s with an unsupported operation.%n", evt)); logger.warning(String.format("Received %s with an unsupported operation.", evt));
} }
} }

View File

@ -33,7 +33,7 @@ public class ContactSearchProcessor implements ObjectProcessor<ContactSearchRequ
public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(ContactSearchRequest request, long socketID, ObjectWriteProxy writeProxy) throws IOException {
writeProxy.write(socketID, writeProxy.write(socketID,
new ContactSearchResult(PersistenceManager.getInstance() new ContactSearchResult(PersistenceManager.getInstance()
.searchUsers(request.get(), ConnectionManager.getInstance().getUserIdBySocketID(socketID)) .searchUsers(request.get(), ConnectionManager.getInstance().getUserIDBySocketID(socketID))
.stream() .stream()
.map(User::toCommon) .map(User::toCommon)
.collect(Collectors.toList()))); .collect(Collectors.toList())));

View File

@ -1,6 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import envoy.event.ElementOperation; import envoy.event.ElementOperation;
@ -25,27 +24,21 @@ public class GroupCreationProcessor implements ObjectProcessor<GroupCreation> {
private final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final ConnectionManager connectionManager = ConnectionManager.getInstance();
@Override @Override
public void process(GroupCreation groupCreation, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(GroupCreation groupCreation, long socketID, ObjectWriteProxy writeProxy) {
envoy.server.data.Group group = new envoy.server.data.Group(); envoy.server.data.Group group = new envoy.server.data.Group();
group.setName(groupCreation.get()); group.setName(groupCreation.get());
group.setContacts(new HashSet<>()); group.setContacts(new HashSet<>());
groupCreation.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add); groupCreation.getInitialMemberIDs().stream().map(persistenceManager::getUserByID).forEach(group.getContacts()::add);
group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIdBySocketID(socketID))); group.getContacts().add(persistenceManager.getContactByID(connectionManager.getUserIDBySocketID(socketID)));
group.getContacts().forEach(c -> c.getContacts().add(group)); group.getContacts().forEach(c -> c.getContacts().add(group));
group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIdBySocketID(socketID))); group.getContacts().add(persistenceManager.getUserByID(connectionManager.getUserIDBySocketID(socketID)));
persistenceManager.addContact(group); persistenceManager.addContact(group);
group.getContacts() group.getContacts()
.stream() .stream()
.map(Contact::getID) .map(Contact::getID)
.filter(connectionManager::isOnline) .filter(connectionManager::isOnline)
.map(connectionManager::getSocketID) .map(connectionManager::getSocketID)
.forEach(memberSocketID -> { .forEach(memberSocketID -> writeProxy.write(memberSocketID, new ContactOperation(group.toCommon(), ElementOperation.ADD)));
try {
writeProxy.write(memberSocketID, new ContactOperation(group.toCommon(), ElementOperation.ADD));
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -1,13 +1,15 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException; import static envoy.data.Message.MessageStatus.*;
import java.util.Date;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.EntityExistsException; import javax.persistence.EntityExistsException;
import envoy.data.GroupMessage; import envoy.data.GroupMessage;
import envoy.data.Message.MessageStatus; import envoy.event.MessageStatusChange;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
@ -23,45 +25,43 @@ import envoy.util.EnvoyLog;
*/ */
public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> { public class GroupMessageProcessor implements ObjectProcessor<GroupMessage> {
private static final Logger logger = EnvoyLog.getLogger(GroupCreationProcessor.class); private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(GroupCreationProcessor.class);
@Override @Override
public void process(GroupMessage groupMessage, long socketID, ObjectWriteProxy writeProxy) { public void process(GroupMessage groupMessage, long socketID, ObjectWriteProxy writeProxy) {
groupMessage.nextStatus(); groupMessage.nextStatus();
ConnectionManager connectionManager = ConnectionManager.getInstance();
final var members = PersistenceManager.getInstance().getGroupByID(groupMessage.getRecipientID()).getContacts(); // Update statuses to SENT / RECEIVED depending on online status
groupMessage.getMemberStatuses().replaceAll((id, oldStatus) -> MessageStatus.SENT); groupMessage.getMemberStatuses().replaceAll((memberID, status) -> connectionManager.isOnline(memberID) ? RECEIVED : SENT);
members.forEach(user -> setMemberStatus(connectionManager, groupMessage, user.getID()));
// Checks if all memberMessageStatuses are RECEIVED and if so sets the // Set status for sender to READ
// groupMessage Status to RECEIVED. groupMessage.getMemberStatuses().replace(groupMessage.getSenderID(), READ);
members.forEach(user -> { sendToMember(connectionManager, groupMessage, user.getID(), writeProxy); }); // Increment the overall status to RECEIVED if necessary
if (Collections.min(groupMessage.getMemberStatuses().values()) == RECEIVED) {
groupMessage.nextStatus();
// Notify the sender of the status change
writeProxy.write(socketID, new MessageStatusChange(groupMessage));
}
// Deliver the message to the recipients that are online
writeProxy.writeToOnlineContacts(
persistenceManager.getGroupByID(groupMessage.getRecipientID())
.getContacts()
.stream()
.filter(c -> c.getID() != groupMessage.getSenderID()),
groupMessage);
try { try {
PersistenceManager.getInstance().addMessage(new envoy.server.data.GroupMessage(groupMessage, new Date())); PersistenceManager.getInstance().addMessage(new envoy.server.data.GroupMessage(groupMessage, LocalDateTime.now()));
} catch (EntityExistsException e) { } catch (EntityExistsException e) {
logger.warning("Received a groupMessage with an ID that already exists"); logger.warning("Received a groupMessage with an ID that already exists");
} }
} }
private void sendToMember(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID, ObjectWriteProxy writeProxy) {
if (connectionManager.isOnline(memberID)) try {
// If recipient is online, send the groupMessage directly
writeProxy.write(connectionManager.getSocketID(memberID), groupMessage);
} catch (IOException e) {
logger.warning("Recipient online. Failed to send message" + groupMessage.getID());
e.printStackTrace();
}
}
private void setMemberStatus(ConnectionManager connectionManager, GroupMessage groupMessage, long memberID) {
if (connectionManager.isOnline(memberID))
// Update the message status of the member to RECEIVED
groupMessage.getMemberStatuses().replace(memberID, MessageStatus.RECEIVED);
}
@Override @Override
public Class<GroupMessage> getInputClass() { return GroupMessage.class; } public Class<GroupMessage> getInputClass() { return GroupMessage.class; }
} }

View File

@ -0,0 +1,69 @@
package envoy.server.processors;
import static envoy.data.Message.MessageStatus.READ;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
import envoy.data.Message.MessageStatus;
import envoy.event.GroupMessageStatusChange;
import envoy.event.MessageStatusChange;
import envoy.server.data.GroupMessage;
import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog;
/**
* Project: <strong>envoy-server-standalone</strong><br>
* File: <strong>GroupMessageStatusChangeProcessor.java</strong><br>
* Created: <strong>03.07.2020</strong><br>
*
* @author Maximilian K&auml;fer
* @since Envoy Server Standalone v0.1-beta
*/
public class GroupMessageStatusChangeProcessor implements ObjectProcessor<GroupMessageStatusChange> {
private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(MessageStatusChangeProcessor.class);
@Override
public void process(GroupMessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) {
GroupMessage gmsg = (GroupMessage) persistenceManager.getMessageByID(statusChange.getID());
// Any other status than READ is not supposed to be sent to the server
if (statusChange.get() != MessageStatus.READ) {
logger.log(Level.WARNING, "Invalid " + statusChange);
return;
}
// Apply the status change
gmsg.getMemberMessageStatus().replace(statusChange.getMemberID(), statusChange.get());
gmsg.setLastStatusChangeDate(LocalDateTime.now());
// Notifying the other members about the status change
final var userID = connectionManager.getUserIDBySocketID(socketID);
gmsg.getMemberMessageStatus()
.keySet()
.stream()
.filter(k -> userID != k)
.filter(connectionManager::isOnline)
.forEach(k -> writeProxy.write(connectionManager.getSocketID(k), statusChange));
// Increment overall status to READ if necessary
if (Collections.min(gmsg.getMemberMessageStatus().values()) == READ) {
gmsg.read();
// Notify online members about the status change
writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
new MessageStatusChange(gmsg.getID(), gmsg.getStatus(), LocalDateTime.now()));
}
persistenceManager.updateMessage(gmsg);
}
@Override
public Class<GroupMessageStatusChange> getInputClass() { return GroupMessageStatusChange.class; }
}

View File

@ -1,7 +1,5 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import envoy.event.GroupResize; import envoy.event.GroupResize;
import envoy.server.data.Contact; import envoy.server.data.Contact;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
@ -22,7 +20,7 @@ public class GroupResizeProcessor implements ObjectProcessor<GroupResize> {
private static final ConnectionManager connectionManager = ConnectionManager.getInstance(); private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
@Override @Override
public void process(GroupResize groupResize, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(GroupResize groupResize, long socketID, ObjectWriteProxy writeProxy) {
// Acquire the group to resize from the database // Acquire the group to resize from the database
var group = persistenceManager.getGroupByID(groupResize.getGroupID()); var group = persistenceManager.getGroupByID(groupResize.getGroupID());
@ -47,13 +45,7 @@ public class GroupResizeProcessor implements ObjectProcessor<GroupResize> {
.map(Contact::getID) .map(Contact::getID)
.filter(connectionManager::isOnline) .filter(connectionManager::isOnline)
.map(connectionManager::getSocketID) .map(connectionManager::getSocketID)
.forEach(memberSocketID -> { .forEach(memberSocketID -> writeProxy.write(memberSocketID, commonGroup));
try {
writeProxy.write(memberSocketID, commonGroup);
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -1,20 +1,20 @@
package envoy.server.processors; package envoy.server.processors;
import static envoy.data.Message.MessageStatus.*;
import static envoy.data.User.UserStatus.ONLINE; import static envoy.data.User.UserStatus.ONLINE;
import static envoy.event.HandshakeRejection.*; import static envoy.event.HandshakeRejection.*;
import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Arrays; import java.util.*;
import java.util.HashSet;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.NoResultException; import javax.persistence.NoResultException;
import envoy.data.LoginCredentials; import envoy.data.LoginCredentials;
import envoy.data.Message.MessageStatus; import envoy.event.GroupMessageStatusChange;
import envoy.event.HandshakeRejection; import envoy.event.HandshakeRejection;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.server.data.GroupMessage;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.data.User; import envoy.server.data.User;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
@ -42,7 +42,7 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
private static final Logger logger = EnvoyLog.getLogger(LoginCredentialProcessor.class); private static final Logger logger = EnvoyLog.getLogger(LoginCredentialProcessor.class);
@Override @Override
public void process(LoginCredentials credentials, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(LoginCredentials credentials, long socketID, ObjectWriteProxy writeProxy) {
// Cache this write proxy for user-independant notifications // Cache this write proxy for user-independant notifications
UserStatusChangeProcessor.setWriteProxy(writeProxy); UserStatusChangeProcessor.setWriteProxy(writeProxy);
@ -83,25 +83,25 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(socketID, new HandshakeRejection(INTERNAL_ERROR)); writeProxy.write(socketID, new HandshakeRejection(INTERNAL_ERROR));
return; return;
} }
try { try {
// Checking that no user already has this identifier // Checking that no user already has this identifier
PersistenceManager.getInstance().getUserByName(credentials.getIdentifier()); PersistenceManager.getInstance().getUserByName(credentials.getIdentifier());
// This code only gets executed if this user already exists // This code only gets executed if this user already exists
logger.info("The requested user already exists."); logger.info("The requested user already exists.");
writeProxy.write(socketID, new HandshakeRejection(USERNAME_TAKEN)); writeProxy.write(socketID, new HandshakeRejection(USERNAME_TAKEN));
return; return;
} catch (NoResultException e) { } catch (NoResultException e) {
// Creation of a new user // Creation of a new user
user = new User(); user = new User();
user.setName(credentials.getIdentifier()); user.setName(credentials.getIdentifier());
user.setLastSeen(LocalDateTime.now()); user.setLastSeen(LocalDateTime.now());
user.setStatus(ONLINE); user.setStatus(ONLINE);
user.setPasswordHash(credentials.getPasswordHash()); user.setPasswordHash(credentials.getPasswordHash());
user.setContacts(new HashSet<>()); user.setContacts(new HashSet<>());
persistenceManager.addContact(user); persistenceManager.addContact(user);
logger.info("Registered new " + user); logger.info("Registered new " + user);
} }
} }
logger.info(user + " successfully authenticated."); logger.info(user + " successfully authenticated.");
@ -115,11 +115,12 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
writeProxy.write(socketID, user.toCommon()); writeProxy.write(socketID, user.toCommon());
final var pendingMessages = PersistenceManager.getInstance().getPendingMessages(user); final var pendingMessages = PersistenceManager.getInstance().getPendingMessages(user);
pendingMessages.removeIf(GroupMessage.class::isInstance);
logger.fine("Sending " + pendingMessages.size() + " pending messages to " + user + "..."); logger.fine("Sending " + pendingMessages.size() + " pending messages to " + user + "...");
for (var msg : pendingMessages) { for (var msg : pendingMessages) {
final var msgCommon = msg.toCommon(); final var msgCommon = msg.toCommon();
if (msg.getStatus() == MessageStatus.SENT) { if (msg.getStatus() == SENT) {
// Send the message // Send the message
writeProxy.write(socketID, msgCommon); writeProxy.write(socketID, msgCommon);
@ -133,6 +134,51 @@ public final class LoginCredentialProcessor implements ObjectProcessor<LoginCred
} }
} else writeProxy.write(socketID, new MessageStatusChange(msgCommon)); } else writeProxy.write(socketID, new MessageStatusChange(msgCommon));
} }
List<GroupMessage> pendingGroupMessages = PersistenceManager.getInstance().getPendingGroupMessages(user);
logger.fine("Sending " + pendingGroupMessages.size() + " pending group messages to " + user + "...");
for (var gmsg : pendingGroupMessages) {
final var gmsgCommon = gmsg.toCommon();
// Deliver the message to the user if he hasn't received it yet
if (gmsg.getMemberMessageStatus().get(user.getID()) == SENT) {
gmsg.getMemberMessageStatus().replace(user.getID(), RECEIVED);
gmsg.setLastStatusChangeDate(LocalDateTime.now());
writeProxy.write(socketID, gmsgCommon);
// Notify all online group members about the status change
writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
new GroupMessageStatusChange(gmsg.getID(), RECEIVED, LocalDateTime
.now(),
connectionManager.getUserIDBySocketID(socketID)));
if (Collections.min(gmsg.getMemberMessageStatus().values()) == RECEIVED) {
gmsg.received();
// Notify online members about the status change
writeProxy.writeToOnlineContacts(gmsg.getRecipient().getContacts(),
new MessageStatusChange(gmsg.getID(), gmsg.getStatus(), LocalDateTime.now()));
}
PersistenceManager.getInstance().updateMessage(gmsg);
} else {
// Sending group message status changes
if (gmsg.getStatus() == SENT && gmsg.getLastStatusChangeDate().isAfter(gmsg.getCreationDate())
|| gmsg.getStatus() == RECEIVED && gmsg.getLastStatusChangeDate().isAfter(gmsg.getReceivedDate())) {
gmsg.getMemberMessageStatus().forEach((memberID, memberStatus) ->
writeProxy.write(socketID, new GroupMessageStatusChange(gmsg.getID(), memberStatus, gmsg.getLastStatusChangeDate(), memberID)));
}
// Deliver just a status change instead of the whole message
if (gmsg.getStatus() == SENT && user.getLastSeen().isBefore(gmsg.getCreationDate())
|| gmsg.getStatus() == RECEIVED && user.getLastSeen().isBefore(gmsg.getReceivedDate()))
writeProxy.write(socketID, new MessageStatusChange(gmsgCommon));
}
}
} }
@Override @Override

View File

@ -1,13 +1,11 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.EntityExistsException; import javax.persistence.EntityExistsException;
import envoy.data.Message; import envoy.data.Message;
import envoy.data.Message.MessageStatus;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
@ -29,14 +27,10 @@ public class MessageProcessor implements ObjectProcessor<Message> {
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance(); private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final ConnectionManager connectionManager = ConnectionManager.getInstance(); private static final ConnectionManager connectionManager = ConnectionManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(MessageProcessor.class); private static final Logger logger = EnvoyLog.getLogger(MessageProcessor.class);
@Override @Override
public void process(Message message, long socketID, ObjectWriteProxy writeProxy) { public void process(Message message, long socketID, ObjectWriteProxy writeProxy) {
if (message.getStatus() != MessageStatus.WAITING) {
logger.warning("Received message with invalid status: " + message);
return;
}
message.nextStatus(); message.nextStatus();
// Convert to server message // Convert to server message
@ -62,8 +56,6 @@ public class MessageProcessor implements ObjectProcessor<Message> {
} }
} catch (EntityExistsException e) { } catch (EntityExistsException e) {
logger.log(Level.WARNING, "Received " + message + " with an ID that already exists!"); logger.log(Level.WARNING, "Received " + message + " with an ID that already exists!");
} catch (IOException e) {
logger.log(Level.WARNING, "Failed to deliver " + message + ":", e);
} }
} }

View File

@ -1,13 +1,15 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException; import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import envoy.data.Message.MessageStatus; import envoy.data.Message.MessageStatus;
import envoy.event.MessageStatusChange; import envoy.event.MessageStatusChange;
import envoy.exception.EnvoyException;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.net.ConnectionManager; import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog;
/** /**
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
@ -19,13 +21,18 @@ import envoy.server.net.ObjectWriteProxy;
*/ */
public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStatusChange> { public class MessageStatusChangeProcessor implements ObjectProcessor<MessageStatusChange> {
private final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private final ConnectionManager connectionManager = ConnectionManager.getInstance(); private final ConnectionManager connectionManager = ConnectionManager.getInstance();
private final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private final Logger logger = EnvoyLog.getLogger(MessageStatusChangeProcessor.class);
@Override @Override
public void process(MessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(MessageStatusChange statusChange, long socketID, ObjectWriteProxy writeProxy) throws IOException {
// Any other status than READ is not supposed to be sent to the server // Any other status than READ is not supposed to be sent to the server
if (statusChange.get() != MessageStatus.READ) throw new IOException(new EnvoyException(statusChange + " has an invalid status")); if (statusChange.get() != MessageStatus.READ) {
logger.log(Level.WARNING, "Invalid " + statusChange);
return;
}
final var msg = persistenceManager.getMessageByID(statusChange.getID()); final var msg = persistenceManager.getMessageByID(statusChange.getID());
msg.read(); msg.read();

View File

@ -5,8 +5,6 @@ import java.io.IOException;
import envoy.event.NameChange; import envoy.event.NameChange;
import envoy.server.data.Contact; import envoy.server.data.Contact;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.data.User;
import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
/** /**
@ -19,22 +17,16 @@ import envoy.server.net.ObjectWriteProxy;
*/ */
public class NameChangeProcessor implements ObjectProcessor<NameChange> { public class NameChangeProcessor implements ObjectProcessor<NameChange> {
private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
@Override @Override
public void process(NameChange nameChange, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(NameChange nameChange, long socketID, ObjectWriteProxy writeProxy) throws IOException {
PersistenceManager persistenceManager = PersistenceManager.getInstance(); Contact toUpdate = persistenceManager.getContactByID(nameChange.getID());
ConnectionManager connectionManager = ConnectionManager.getInstance();
Contact toUpdate = persistenceManager.getContactByID(nameChange.getID());
toUpdate.setName(nameChange.get()); toUpdate.setName(nameChange.get());
persistenceManager.updateContact(toUpdate); persistenceManager.updateContact(toUpdate);
// Notify online contacts of the name change // Notify online contacts of the name change
toUpdate.getContacts().stream().filter(User.class::isInstance).map(Contact::getID).filter(connectionManager::isOnline).forEach(userID -> { writeProxy.writeToOnlineContacts(toUpdate.getContacts(), nameChange);
try {
writeProxy.write(userID, nameChange);
} catch (IOException e) {
e.printStackTrace();
}
});
} }
@Override @Override

View File

@ -1,19 +1,17 @@
package envoy.server.processors; package envoy.server.processors;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import envoy.data.User.UserStatus; import envoy.data.User.UserStatus;
import envoy.event.UserStatusChange; import envoy.event.UserStatusChange;
import envoy.server.data.PersistenceManager; import envoy.server.data.PersistenceManager;
import envoy.server.data.User; import envoy.server.data.User;
import envoy.server.net.ConnectionManager;
import envoy.server.net.ObjectWriteProxy; import envoy.server.net.ObjectWriteProxy;
import envoy.util.EnvoyLog; import envoy.util.EnvoyLog;
/** /**
* This processor handles incoming {@link UserStatusChange}.<br> * This processor handles incoming {@link UserStatusChange}.
* <br> * <p>
* Project: <strong>envoy-server-standalone</strong><br> * Project: <strong>envoy-server-standalone</strong><br>
* File: <strong>UserStatusChangeProcessor.java</strong><br> * File: <strong>UserStatusChangeProcessor.java</strong><br>
* Created: <strong>1 Feb 2020</strong><br> * Created: <strong>1 Feb 2020</strong><br>
@ -23,16 +21,16 @@ import envoy.util.EnvoyLog;
*/ */
public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChange> { public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChange> {
private static ObjectWriteProxy writeProxy; private static ObjectWriteProxy writeProxy;
private static PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(UserStatusChangeProcessor.class); private static final PersistenceManager persistenceManager = PersistenceManager.getInstance();
private static final Logger logger = EnvoyLog.getLogger(UserStatusChangeProcessor.class);
@Override @Override
public Class<UserStatusChange> getInputClass() { return UserStatusChange.class; } public Class<UserStatusChange> getInputClass() { return UserStatusChange.class; }
@Override @Override
public void process(UserStatusChange input, long socketID, ObjectWriteProxy writeProxy) throws IOException { public void process(UserStatusChange input, long socketID, ObjectWriteProxy writeProxy) {
// new status should not equal old status // new status should not equal old status
if (input.get().equals(persistenceManager.getUserByID(input.getID()).getStatus())) { if (input.get().equals(persistenceManager.getUserByID(input.getID()).getStatus())) {
logger.warning("Received an unnecessary UserStatusChange"); logger.warning("Received an unnecessary UserStatusChange");
@ -55,7 +53,7 @@ public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChan
persistenceManager.updateContact(user); persistenceManager.updateContact(user);
// Handling for contacts that are already online // Handling for contacts that are already online
notifyContacts(user); writeProxy.writeToOnlineContacts(user.getContacts(), new UserStatusChange(user.getID(), user.getStatus()));
} }
/** /**
@ -64,25 +62,6 @@ public class UserStatusChangeProcessor implements ObjectProcessor<UserStatusChan
*/ */
public static void updateUserStatus(UserStatusChange evt) { updateUserStatus(persistenceManager.getUserByID(evt.getID())); } public static void updateUserStatus(UserStatusChange evt) { updateUserStatus(persistenceManager.getUserByID(evt.getID())); }
/**
* notifies active contacts of this {@link User} that his {@link UserStatus} has
* changed
*
* @param user the {@link User}
* @since Envoy Server Standalone v0.1-alpha
*/
private static void notifyContacts(User user) {
UserStatusChange evt = new UserStatusChange(user.getID(), user.getStatus());
ConnectionManager connectionManager = ConnectionManager.getInstance();
try {
for (envoy.server.data.Contact contact : user.getContacts())
if (connectionManager.isOnline(contact.getID())) writeProxy.write(connectionManager.getSocketID(contact.getID()), evt);
} catch (IOException e) {
e.printStackTrace();
logger.warning("Could not notify online contacts of user " + evt.getID() + " that his status has been changed");
}
}
/** /**
* This method is only called by the LoginCredentialProcessor because every * This method is only called by the LoginCredentialProcessor because every
* user needs to login (open a socket) before changing his status. * user needs to login (open a socket) before changing his status.