package edu.cornell.cs.nlp.spf.reliabledist;

import edu.cornell.cs.nlp.spf.explat.IResourceRepository;
import edu.cornell.cs.nlp.spf.explat.ParameterizedExperiment;
import edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator;
import edu.cornell.cs.nlp.spf.explat.resources.usage.ResourceUsage;
import edu.cornell.cs.nlp.spf.mr.lambda.Term;
import edu.cornell.cs.nlp.utils.log.ILogger;
import edu.cornell.cs.nlp.utils.log.LoggerFactory;
import edu.cornell.cs.nlp.utils.log.thread.LoggingThreadFactory;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/EnslavedLocalManager.class */
public class EnslavedLocalManager implements IManager, Runnable, ITaskExecutor {
    public static final ILogger LOG = LoggerFactory.create((Class<?>) EnslavedLocalManager.class);
    private final LocalWorkerPool localPool;
    private final String masterAddress;
    private final int masterPort;
    private final String name;
    private AbstractEnvironment enviroment = null;
    private ObjectInputStream inputStream = null;
    private boolean isRunning = true;
    private ObjectOutputStream outputStream = null;
    private URLClassLoader urlClassLoader = new URLClassLoader(new URL[0]);
    private final AtomicInteger workerIdGenerator = new AtomicInteger(0);

    /* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/EnslavedLocalManager$Creator.class */
    public static class Creator implements IResourceObjectCreator<EnslavedLocalManager> {
        private final String type;

        public Creator() {
            this("tinydist.worker");
        }

        public Creator(String str) {
            this.type = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public EnslavedLocalManager create(ParameterizedExperiment.Parameters parameters, IResourceRepository iResourceRepository) {
            return new EnslavedLocalManager(parameters.get("addr"), parameters.getAsInteger(DistributionConstants.PORT), parameters.getAsInteger("threads"), new LoggingThreadFactory(parameters.get("name", "tinydist")), parameters.get("name", "tinydist"), parameters.contains("logDir") ? parameters.getAsFile("logDir") : null);
        }

        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public String type() {
            return this.type;
        }

        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public ResourceUsage usage() {
            return ResourceUsage.builder(this.type, EnslavedLocalManager.class).setDescription("Work manager").addParam("addr", String.class, "Master address").addParam(DistributionConstants.PORT, Integer.class, "Master port").addParam("name", String.class, "Worker name (used for logging)").addParam("logDir", File.class, "Logging directory for task execution (default: stderr)").addParam("threads", Integer.class, "Number of worker threads").build();
        }
    }

    public EnslavedLocalManager(String str, int i, int i2, ThreadFactory threadFactory, String str2, File file) {
        this.masterAddress = str;
        this.masterPort = i;
        this.name = str2;
        this.localPool = new LocalWorkerPool(i2, threadFactory, () -> {
            return new Worker(this, Integer.toString(this.workerIdGenerator.getAndIncrement()), file);
        });
    }

    private static String stackToString(Exception exc) {
        String str = "";
        for (StackTraceElement stackTraceElement : exc.getStackTrace()) {
            str = str + "\t exception \t" + stackTraceElement.toString() + "\n";
        }
        return str;
    }

    public void clientLoop() {
        while (true) {
            synchronized (this) {
                if (!this.isRunning) {
                    return;
                }
            }
            try {
                Object readObject = this.inputStream.readObject();
                if (!(readObject instanceof Message)) {
                    LOG.error("Invalid object received: %s", readObject.getClass().toString());
                }
                Message message = (Message) readObject;
                if (message instanceof MessageWithId) {
                    sendAck((MessageWithId) message);
                }
                LOG.debug("Received command: %s", message.getCommand());
                if (DistributionConstants.PING.equals(message.getCommand())) {
                    sendReply(DistributionConstants.AK);
                } else if (DistributionConstants.SUMMARY.equals(message.getCommand())) {
                    sendReply(DistributionConstants.SUMMARY);
                } else if (DistributionConstants.ENIVROMENT.equals(message.getCommand())) {
                    setupEnviroment(message.getEnvironment());
                } else if (DistributionConstants.MODIFY_ENVIROMENT.equals(message.getCommand())) {
                    updateEnviroment(message.getEnvUpdates());
                } else if (DistributionConstants.WORK.equals(message.getCommand())) {
                    synchronized (this) {
                        try {
                            Task task = message.getTask();
                            LOG.info("Received task %d", Long.valueOf(task.getId()));
                            execute(task);
                        } catch (Exception e) {
                            throw e;
                        }
                    }
                } else if (DistributionConstants.INIT.equals(message.getCommand())) {
                    setupCommand(message.get());
                } else if (DistributionConstants.SHUTDOWN.equals(message.getCommand())) {
                    LOG.info("Shutting down");
                    System.exit(0);
                }
            } catch (Exception e2) {
                LOG.error("Exception in main loop: %s", (Throwable) e2);
                sendErroredMessage("Fatal Exception in Main Loop", e2);
                return;
            }
        }
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.ITaskExecutor
    public boolean execute(Task task) {
        if (existsFree()) {
            return this.localPool.addWork(task);
        }
        LOG.error("no free space!");
        return false;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean existsFree() {
        return this.localPool.existsFree();
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public <ENV extends AbstractEnvironment> ENV getEnviroment() {
        return (ENV) this.enviroment;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.ITaskExecutor
    public String getName() {
        return this.name;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean isRunning() {
        return this.isRunning;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean reportResult(ITaskExecutor iTaskExecutor, Task task, TaskResult taskResult) {
        boolean sendTaskReply;
        synchronized (this) {
            sendTaskReply = sendTaskReply(taskResult);
        }
        return sendTaskReply;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            Socket socket = null;
            try {
                try {
                    try {
                        socket = new Socket(this.masterAddress, this.masterPort);
                        this.outputStream = new ObjectOutputStream(socket.getOutputStream());
                        this.inputStream = new ObjectInputStream(socket.getInputStream());
                        clientLoop();
                        if (socket != null) {
                            try {
                                socket.close();
                            } catch (IOException e) {
                            }
                        }
                    } finally {
                    }
                } catch (RuntimeException e2) {
                    this.isRunning = false;
                    return;
                }
            } catch (UnknownHostException e3) {
                LOG.error("Don't know about host: %s", this.masterAddress);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e4) {
                    }
                }
            } catch (IOException e5) {
                LOG.error("Couldn't get I/O for the connection to: %s", this.masterAddress);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e6) {
                    }
                }
            }
            try {
                Thread.sleep(2000L);
            } catch (Exception e7) {
            }
        }
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public synchronized boolean setupCommand(Map<String, String> map) {
        String str = map.get(DistributionConstants._initcommand);
        if (str.equals(DistributionConstants.AWSKEY)) {
            throw new RuntimeException("not implemented");
        }
        if (str.equals(DistributionConstants.DOWNLOADFILE)) {
            throw new RuntimeException("not implemented");
        }
        if (str.equals(DistributionConstants.JARFILE)) {
            throw new RuntimeException("not implemented");
        }
        return true;
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public synchronized boolean setupEnviroment(AbstractEnvironment abstractEnvironment) {
        if (!this.localPool.allFree()) {
            sendErroredMessage("Attempting to setup enviroment while jobs are running.");
            return false;
        }
        LOG.info("Setup environment: %s", abstractEnvironment.getClass());
        this.enviroment = abstractEnvironment;
        return true;
    }

    public synchronized void terminate() {
        this.isRunning = false;
        this.localPool.terminate();
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
            }
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (Exception e) {
            LOG.error("Exception when closing intput stream: %s", (Throwable) e);
        }
    }

    @Override // edu.cornell.cs.nlp.spf.reliabledist.IManager
    public boolean updateEnviroment(List<SerializedEnvironmentConfig> list) {
        if (!this.localPool.allFree()) {
            sendErroredMessage("Attempting to update enviroment while jobs are running.");
            return false;
        }
        try {
            for (SerializedEnvironmentConfig serializedEnvironmentConfig : list) {
                LOG.info("Applying environment update %s", Integer.valueOf(serializedEnvironmentConfig.getId()));
                this.enviroment.update(new EnvironmentConfig<>(serializedEnvironmentConfig));
            }
            return true;
        } catch (IOException | ClassNotFoundException e) {
            sendErroredMessage("Failed to de-serialize environment update.", e);
            return false;
        }
    }

    private boolean registerJar(String str) {
        try {
            File file = new File(str);
            URL[] urlArr = new URL[this.urlClassLoader.getURLs().length + 1];
            int i = 0;
            for (URL url : this.urlClassLoader.getURLs()) {
                urlArr[i] = url;
                i++;
            }
            urlArr[i] = file.toURI().toURL();
            this.urlClassLoader = new URLClassLoader(urlArr);
            return true;
        } catch (Exception e) {
            LOG.error("Exception when loading JAR: %s", (Throwable) e);
            return false;
        }
    }

    private synchronized boolean send(Message message) {
        try {
            message.put(DistributionConstants._free, Integer.toString(this.localPool.numFreeWorkers()));
            this.outputStream.writeObject(message);
            this.outputStream.flush();
            this.outputStream.reset();
            return true;
        } catch (IOException e) {
            LOG.error("Failed to send: ", (Throwable) e);
            return false;
        }
    }

    private boolean sendAck(MessageWithId messageWithId) {
        long messageId = messageWithId.getMessageId();
        Message message = new Message(DistributionConstants.AK);
        message.put(DistributionConstants._ackid, Long.toString(messageId));
        return send(message);
    }

    private boolean sendErroredMessage(String str) {
        LOG.info("Sending error message: %s", str);
        Message message = new Message(DistributionConstants.ERROR);
        message.put(DistributionConstants._message, str);
        return send(message);
    }

    private boolean sendErroredMessage(String str, Exception exc) {
        LOG.info("Sending error message with exception: %s", str);
        LOG.info((Throwable) exc);
        Message message = new Message(DistributionConstants.ERROR);
        message.put(DistributionConstants._message, str + Term.TYPE_SEPARATOR + exc.toString());
        message.put(DistributionConstants._stacktrace, stackToString(exc));
        return send(message);
    }

    private boolean sendReply(String str) {
        return send(new Message(str));
    }

    private synchronized boolean sendTaskReply(TaskResult taskResult) {
        return send(new Message(DistributionConstants.RETURN, taskResult));
    }
}
