package edu.cornell.cs.nlp.spf.reliabledist;

import edu.cornell.cs.nlp.spf.base.hashvector.IHashVector;
import edu.cornell.cs.nlp.spf.mr.lambda.Term;
import edu.cornell.cs.nlp.spf.reliabledist.WorkerSummary;
import edu.cornell.cs.nlp.utils.log.ILogger;
import edu.cornell.cs.nlp.utils.log.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/EnslavedRemoteManager.class */
public class EnslavedRemoteManager implements IManager, Runnable, ITaskExecutor {
    public static final ILogger LOG = LoggerFactory.create((Class<?>) EnslavedRemoteManager.class);
    private final ReliableManager globalManager;
    private final int id;
    private final ObjectInputStream inputStream;
    private final String name;
    private final ObjectOutputStream outputStream;
    private final long pingFrequency;
    private final long pingTimeout;
    private MessageWithId activeMessage = null;
    private final Map<Long, Task> activeTasks = new HashMap();
    private final Map<Long, Long> activeTaskStartTime = new HashMap();
    private double executionTimeDecayingAverage = IHashVector.ZERO_VALUE;
    private int freeSpots = -1;
    private final Queue<Object> incomingObjects = new ConcurrentLinkedQueue();
    private boolean isFailed = false;
    private boolean isRunning = true;
    private long lastHeard = System.currentTimeMillis();
    private long lastPing = System.currentTimeMillis();
    private final AtomicLong messageIdGenerator = new AtomicLong(0);
    private final Queue<MessageWithId> queuedMessages = new LinkedList();
    private final AtomicInteger taskAccepted = new AtomicInteger(0);
    private final AtomicInteger taskReturned = new AtomicInteger(0);
    private final ObjectReadingThread readingThread = new ObjectReadingThread();

    /* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/EnslavedRemoteManager$ObjectReadingThread.class */
    private class ObjectReadingThread extends Thread {
        private ObjectReadingThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (EnslavedRemoteManager.this.isRunning) {
                try {
                    EnslavedRemoteManager.this.incomingObjects.offer(EnslavedRemoteManager.this.inputStream.readObject());
                    synchronized (EnslavedRemoteManager.this) {
                        EnslavedRemoteManager.this.notifyAll();
                    }
                } catch (IOException e) {
                    EnslavedRemoteManager.LOG.error("Reading thread exception: %s", (Throwable) e);
                    EnslavedRemoteManager.this.isFailed = true;
                    EnslavedRemoteManager.this.isRunning = false;
                } catch (ClassNotFoundException e2) {
                    EnslavedRemoteManager.LOG.error("Failed to read object: %s", (Throwable) e2);
                }
            }
        }
    }

    public EnslavedRemoteManager(Socket socket, long j, long j2, ReliableManager reliableManager, int i) throws IOException {
        this.pingFrequency = j;
        this.pingTimeout = j2;
        this.globalManager = reliableManager;
        this.id = i;
        this.name = socket.getInetAddress().toString() + Term.TYPE_SEPARATOR + socket.getPort();
        this.outputStream = new ObjectOutputStream(socket.getOutputStream());
        this.inputStream = new ObjectInputStream(socket.getInputStream());
        this.readingThread.start();
    }

    public boolean allFree() {
        return this.activeTasks.isEmpty();
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.ITaskExecutor
    public boolean execute(Task task) {
        if (this.freeSpots == 0) {
            return false;
        }
        MessageWithId messageWithId = new MessageWithId(this.messageIdGenerator.getAndIncrement(), DistributionConstants.WORK, task);
        synchronized (this) {
            this.activeTasks.put(Long.valueOf(task.getId()), task);
            this.activeTaskStartTime.put(Long.valueOf(task.getId()), Long.valueOf(System.currentTimeMillis()));
            this.freeSpots--;
            this.taskAccepted.incrementAndGet();
        }
        qsend(messageWithId);
        return true;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean existsFree() {
        boolean z;
        synchronized (this) {
            z = this.freeSpots > 0 && this.queuedMessages.isEmpty();
        }
        return z;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public <ENV extends AbstractEnvironment> ENV getEnviroment() {
        return null;
    }

    public double getExecutionTimeAverage() {
        return this.executionTimeDecayingAverage;
    }

    public int getId() {
        return this.id;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.ITaskExecutor
    public String getName() {
        return this.name;
    }

    public WorkerSummary getSummary() {
        return new WorkerSummary.Builder(this.id, getName()).setTasksAccepted(this.taskAccepted.get()).setTaskCompelted(this.taskReturned.get()).setFailed(this.isFailed).setMeanTime(this.executionTimeDecayingAverage).setFreeSpots(this.freeSpots).build();
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean isRunning() {
        return this.isRunning;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean reportResult(ITaskExecutor iTaskExecutor, Task task, TaskResult taskResult) {
        throw new IllegalStateException();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            serverLoop();
        } catch (RuntimeException e) {
            this.isRunning = false;
            throw e;
        }
    }

    public void serverLoop() {
        MessageWithId messageWithId;
        while (this.isRunning) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                if (currentTimeMillis - this.lastHeard > this.pingFrequency) {
                    if (currentTimeMillis - this.lastHeard > this.pingTimeout * 2) {
                        LOG.error("Entering failed state on ping failure.");
                        this.isRunning = false;
                        this.isFailed = true;
                        return;
                    } else if (currentTimeMillis - this.lastPing > this.pingFrequency) {
                        this.lastPing = currentTimeMillis;
                        LOG.debug("Sending ping");
                        if (!send(new Message(DistributionConstants.PING))) {
                            this.isFailed = true;
                            this.isRunning = false;
                            return;
                        }
                    }
                }
                synchronized (this) {
                    if (this.activeMessage != null || this.queuedMessages.size() <= 0) {
                        messageWithId = null;
                    } else {
                        this.activeMessage = this.queuedMessages.poll();
                        messageWithId = this.activeMessage;
                    }
                }
                if (messageWithId != null && !send(this.activeMessage)) {
                    this.isFailed = true;
                    this.isRunning = false;
                    return;
                }
                if (!this.incomingObjects.isEmpty()) {
                    this.lastHeard = currentTimeMillis;
                    this.lastPing = currentTimeMillis;
                    processReply();
                }
                synchronized (this) {
                    try {
                        wait(1000L);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception e2) {
                LOG.error("Entering failed state on a local exception: %s", (Throwable) e2);
                this.isRunning = false;
                this.isFailed = true;
                return;
            }
        }
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean setupCommand(Map<String, String> map) {
        MessageWithId messageWithId = new MessageWithId(this.messageIdGenerator.getAndIncrement(), DistributionConstants.INIT);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            messageWithId.put(entry.getKey(), entry.getValue());
        }
        qsend(messageWithId);
        return true;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean setupEnviroment(AbstractEnvironment abstractEnvironment) {
        qsend(new MessageWithId(this.messageIdGenerator.getAndIncrement(), DistributionConstants.ENIVROMENT, abstractEnvironment));
        return true;
    }

    public void terminate() {
        synchronized (this) {
            LOG.info("Sending shutdown message to %s", this.name);
            this.queuedMessages.clear();
            qsend(new MessageWithId(this.messageIdGenerator.getAndIncrement(), DistributionConstants.SHUTDOWN));
            this.readingThread.interrupt();
            this.isRunning = false;
        }
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean updateEnviroment(List<SerializedEnvironmentConfig> list) {
        qsend(new MessageWithId(this.messageIdGenerator.getAndIncrement(), DistributionConstants.MODIFY_ENVIROMENT, list));
        return true;
    }

    private void processReply() throws Exception {
        Object poll = this.incomingObjects.poll();
        if (!(poll instanceof Message)) {
            LOG.error("Invalid object received: %s", poll.getClass());
            return;
        }
        Message message = (Message) poll;
        LOG.debug("Received command from client: %s", message.getCommand());
        String str = message.get(DistributionConstants._free);
        synchronized (this) {
            if (this.freeSpots == -1) {
                this.freeSpots = Integer.parseInt(str);
            }
        }
        String command = message.getCommand();
        if (DistributionConstants.AK.equals(command)) {
            if (message.get(DistributionConstants._ackid) != null) {
                long parseLong = Long.parseLong(message.get(DistributionConstants._ackid));
                long messageId = this.activeMessage.getMessageId();
                synchronized (this) {
                    if (parseLong == messageId) {
                        this.activeMessage = null;
                    } else {
                        LOG.error("active command not lining up!");
                    }
                }
                return;
            }
            return;
        }
        if (DistributionConstants.RETURN.equals(command)) {
            processResultMessage(message);
            synchronized (this) {
                this.freeSpots++;
            }
            this.taskReturned.incrementAndGet();
            return;
        }
        if (command.equals(DistributionConstants.SUMMARY)) {
            LOG.error("Not properly implemented yet");
        } else if (command.equals(DistributionConstants.ERROR)) {
            LOG.info("Error message: %s", message.get(DistributionConstants._message));
            if (message.get(DistributionConstants._stacktrace) != null) {
                LOG.info(message.get(DistributionConstants._stacktrace));
            }
        }
    }

    private void processResultMessage(Message message) {
        long taskId = message.getResult().getTaskId();
        Task task = this.activeTasks.get(Long.valueOf(taskId));
        this.executionTimeDecayingAverage = (this.executionTimeDecayingAverage + (System.currentTimeMillis() - this.activeTaskStartTime.get(Long.valueOf(taskId)).longValue())) / 2.0d;
        TaskResult result = message.getResult();
        synchronized (this) {
            if (this.activeTasks.remove(Long.valueOf(taskId)) == null || this.activeTaskStartTime.remove(Long.valueOf(taskId)) == null) {
                LOG.error("BUG: Unknown returned task ID");
            }
        }
        this.globalManager.reportResult(this, task, result);
    }

    private void qsend(MessageWithId messageWithId) {
        synchronized (this) {
            this.queuedMessages.add(messageWithId);
            notifyAll();
        }
    }

    private boolean send(Message message) {
        synchronized (this.outputStream) {
            try {
                this.outputStream.writeObject(message);
                this.outputStream.flush();
                this.outputStream.reset();
            } catch (IOException e) {
                LOG.error("Failed to send message: %s", (Throwable) e);
                return false;
            } catch (RuntimeException e2) {
                LOG.error("Failed to send message: %s", message.getCommand());
                LOG.error("Failed to send message: %s", (Throwable) e2);
                throw e2;
            }
        }
        return true;
    }
}
