package envoy.client.net; import java.io.*; import java.net.SocketException; import java.util.*; import java.util.function.Consumer; import java.util.logging.*; import envoy.util.*; /** * Receives objects from the server and passes them to processor objects based * on their class. *

* Project: envoy-client
* File: Receiver.java
* Created: 30.12.2019
* * @author Kai S. K. Engelbart * @since Envoy Client v0.3-alpha */ public final class Receiver extends Thread { private boolean isAlive = true; private final InputStream in; private final Map, Consumer> processors = new HashMap<>(); private static final Logger logger = EnvoyLog.getLogger(Receiver.class); /** * Creates an instance of {@link Receiver}. * * @param in the {@link InputStream} to parse objects from * @since Envoy Client v0.3-alpha */ public Receiver(InputStream in) { super("Receiver"); this.in = in; } /** * Starts the receiver loop. When an object is read, it is passed to the * appropriate processor. * * @since Envoy Client v0.3-alpha */ @Override public void run() { while (isAlive) try { // Read object length final byte[] lenBytes = new byte[4]; in.read(lenBytes); final int len = SerializationUtils.bytesToInt(lenBytes, 0); logger.log(Level.FINEST, "Expecting object of length " + len + "."); // Read object into byte array final byte[] objBytes = new byte[len]; final int bytesRead = in.read(objBytes); logger.log(Level.FINEST, "Read " + bytesRead + " bytes."); // Catch LV encoding errors if (len != bytesRead) { // Server has stopped sending, i.e. because he went offline if (bytesRead == -1) { isAlive = false; logger.log(Level.INFO, "Lost connection to the server. Exiting receiver..."); continue; } logger.log(Level.WARNING, String.format("LV encoding violated: expected %d bytes, received %d bytes. Discarding object...", len, bytesRead)); continue; } try (ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(objBytes))) { final Object obj = oin.readObject(); logger.log(Level.FINE, "Received " + obj); // Get appropriate processor @SuppressWarnings("rawtypes") final Consumer processor = processors.get(obj.getClass()); if (processor == null) logger.log(Level.WARNING, String.format("The received object has the %s for which no processor is defined.", obj.getClass())); else processor.accept(obj); } } catch (final SocketException | EOFException e) { // Connection probably closed by client. logger.log(Level.INFO, "Exiting receiver..."); return; } catch (final Exception e) { logger.log(Level.SEVERE, "Error on receiver thread", e); } } /** * Adds an object processor to this {@link Receiver}. It will be called once an * object of the accepted class has been received. * * @param processorClass the object class accepted by the processor * @param processor the object processor * @since Envoy Client v0.3-alpha */ public void registerProcessor(Class processorClass, Consumer processor) { processors.put(processorClass, processor); } /** * Adds a map of object processors to this {@link Receiver}. * * @param processors the processors to add the processors to add * @since Envoy Client v0.1-beta */ public void registerProcessors(Map, ? extends Consumer> processors) { this.processors.putAll(processors); } /** * Removes all object processors registered at this {@link Receiver}. * * @since Envoy Client v0.3-alpha */ public void removeAllProcessors() { processors.clear(); } }