This repository has been archived on 2021-12-05. You can view files and clone it, but cannot push or open issues or pull requests.
envoy/client/src/main/java/envoy/client/net/Receiver.java

142 lines
4.0 KiB
Java

package envoy.client.net;
import java.io.*;
import java.net.SocketException;
import java.util.*;
import java.util.function.Consumer;
import java.util.logging.*;
import dev.kske.eventbus.core.EventBus;
import envoy.util.*;
/**
* Receives objects from the server and passes them to processor objects based on their class.
*
* @author Kai S. K. Engelbart
* @since Envoy Client v0.3-alpha
*/
public final class Receiver extends Thread {
private boolean isAlive = true;
private final InputStream in;
private final Map<Class<?>, Consumer<?>> processors = new HashMap<>();
private static final EventBus eventBus = EventBus.getInstance();
private static final Logger logger = EnvoyLog.getLogger(Receiver.class);
/**
* Creates an instance of {@link Receiver}.
*
* @param in the {@link InputStream} to parse objects from
* @since Envoy Client v0.3-alpha
*/
public Receiver(InputStream in) {
super("Receiver");
this.in = in;
setDaemon(true);
}
/**
* Starts the receiver loop. When an object is read, it is passed to the appropriate processor.
*
* @since Envoy Client v0.3-alpha
*/
@Override
public void run() {
while (isAlive)
try {
// Read object length
final byte[] lenBytes = new byte[4];
in.read(lenBytes);
final int len = SerializationUtils.bytesToInt(lenBytes, 0);
logger.log(Level.FINEST, "Expecting object of length " + len + ".");
// Read object into byte array
final byte[] objBytes = new byte[len];
final int bytesRead = in.read(objBytes);
logger.log(Level.FINEST, "Read " + bytesRead + " bytes.");
// Catch LV encoding errors
if (len != bytesRead) {
// Server has stopped sending, i.e. because he went offline
if (bytesRead == -1) {
isAlive = false;
logger.log(Level.INFO,
"Lost connection to the server. Exiting receiver...");
continue;
}
logger.log(Level.WARNING,
String.format(
"LV encoding violated: expected %d bytes, received %d bytes. Discarding object...",
len, bytesRead));
continue;
}
try (ObjectInputStream oin =
new ObjectInputStream(new ByteArrayInputStream(objBytes))) {
final Object obj = oin.readObject();
logger.log(Level.FINE, "Received " + obj);
// Get appropriate processor
@SuppressWarnings("rawtypes")
final Consumer processor = processors.get(obj.getClass());
// Dispatch to the processor if present
if (processor != null)
processor.accept(obj);
// Dispatch to the event bus if the object has no processor
else
eventBus.dispatch(obj);
// TODO: Log DeadEvent from Event Bus 1.1.0
// Notify if no processor could be located
// else
// logger.log(Level.WARNING,
// String.format(
// "The received object has the %s for which no processor is defined.",
// obj.getClass()));
}
} catch (final SocketException | EOFException e) {
// Connection probably closed by client.
logger.log(Level.INFO, "Exiting receiver...");
return;
} catch (final Exception e) {
logger.log(Level.SEVERE, "Error on receiver thread", e);
}
}
/**
* Adds an object processor to this {@link Receiver}. It will be called once an object of the
* accepted class has been received.
*
* @param processorClass the object class accepted by the processor
* @param processor the object processor
* @since Envoy Client v0.3-alpha
*/
public <T> void registerProcessor(Class<T> processorClass, Consumer<T> processor) {
processors.put(processorClass, processor);
}
/**
* Adds a map of object processors to this {@link Receiver}.
*
* @param processors the processors to add the processors to add
* @since Envoy Client v0.1-beta
*/
public void registerProcessors(Map<Class<?>, ? extends Consumer<?>> processors) {
this.processors.putAll(processors);
}
/**
* Removes all object processors registered at this {@link Receiver}.
*
* @since Envoy Client v0.3-alpha
*/
public void removeAllProcessors() {
processors.clear();
}
}