package edu.cornell.cs.nlp.spf.reliabledist;

import edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable;
import edu.cornell.cs.nlp.spf.explat.IResourceRepository;
import edu.cornell.cs.nlp.spf.explat.ParameterizedExperiment;
import edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator;
import edu.cornell.cs.nlp.spf.explat.resources.usage.ResourceUsage;
import edu.cornell.cs.nlp.spf.reliabledist.ManagerSummary;
import edu.cornell.cs.nlp.utils.collections.ListUtils;
import edu.cornell.cs.nlp.utils.composites.Pair;
import edu.cornell.cs.nlp.utils.log.ILogger;
import edu.cornell.cs.nlp.utils.log.LoggerFactory;
import edu.cornell.cs.nlp.utils.log.thread.LoggingThreadFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/ReliableManager.class */
public class ReliableManager implements Runnable, Shutdownable {
    public static final ILogger LOG;
    private final List<Map<String, String>> connectionCommands;
    private final long connectionTimeout;
    private final Thread mythread;
    private final long pingFrequency;
    private final Thread register;
    private final int registerPort;
    private final File summaryFile;
    private final long summaryFrequency;
    private final ThreadFactory threadFactory;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final List<Task> completed = new LinkedList();
    private AbstractEnvironment currentEnviroment = null;
    private final Object environmentLock = new Object();
    private final AtomicInteger environmentUpdateIdGenerator = new AtomicInteger(0);
    private final Map<Long, JobFuture<?>> futures = new HashMap();
    private boolean isRunning = true;
    private final AtomicInteger managerIdGenerator = new AtomicInteger(0);
    private final List<EnslavedRemoteManager> managers = new LinkedList();
    private final List<IManager> nonworkingManager = new LinkedList();
    private final Queue<Task> queuedTasks = new LinkedList();
    private final Map<EnslavedRemoteManager, List<Task>> runningTasks = new HashMap();
    private final AtomicLong taskIdGenerator = new AtomicLong(0);
    private final Map<Task, ITaskExecutor> taskWorker = new HashMap();
    private final Object terminationLock = new Object();
    private final AtomicInteger totalCompletedTask = new AtomicInteger(0);
    private final AtomicInteger totalRedone = new AtomicInteger(0);

    /* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/ReliableManager$Builder.class */
    public static class Builder {
        private final ThreadFactory threadFactory;
        private final List<Map<String, String>> connectionCommands = new ArrayList();
        private long pingFrequency = 20000;
        private int port = -1;
        private File summaryFile = null;
        private long summaryFrequency = 20000;
        private long timeout = 200000;

        public Builder(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
        }

        public ReliableManager build() {
            if (this.port < 0) {
                throw new IllegalStateException("Port not set");
            }
            return new ReliableManager(this.port, this.connectionCommands, this.pingFrequency, this.timeout, this.threadFactory, this.summaryFile, this.summaryFrequency);
        }

        public Builder configureFromFile(File file) throws FileNotFoundException, IOException {
            for (Pair pair : ReliableManager.readConfigurationCommands(file)) {
                String str = (String) pair.first();
                String[] split = ((String) pair.second()).split("\t");
                if (str.equals(DistributionConstants.PORT)) {
                    this.port = Integer.parseInt(split[0]);
                } else if (str.equals(DistributionConstants.AWSKEY)) {
                    String str2 = split[0];
                    String str3 = split[1];
                    HashMap<String, String> hashMap = new HashMap<>();
                    hashMap.put(DistributionConstants._initcommand, DistributionConstants.AWSKEY);
                    hashMap.put(DistributionConstants._awsacsess, str2);
                    hashMap.put(DistributionConstants._awssecret, str3);
                    setupCommand(hashMap);
                } else if (str.equals(DistributionConstants.DOWNLOADFILE)) {
                    String str4 = split[0];
                    String str5 = split[1];
                    HashMap<String, String> hashMap2 = new HashMap<>();
                    hashMap2.put(DistributionConstants._initcommand, DistributionConstants.DOWNLOADFILE);
                    hashMap2.put(DistributionConstants._bucket, str4);
                    hashMap2.put(DistributionConstants._file, str5);
                    setupCommand(hashMap2);
                } else if (str.equals(DistributionConstants.JARFILE)) {
                    String str6 = split[0];
                    String str7 = split[1];
                    HashMap<String, String> hashMap3 = new HashMap<>();
                    hashMap3.put(DistributionConstants._initcommand, DistributionConstants.JARFILE);
                    hashMap3.put(DistributionConstants._bucket, str6);
                    hashMap3.put(DistributionConstants._file, str7);
                    setupCommand(hashMap3);
                }
            }
            return this;
        }

        public Builder setPingFrequency(long j) {
            this.pingFrequency = j;
            return this;
        }

        public Builder setPort(int i) {
            this.port = i;
            return this;
        }

        public Builder setSummaryFile(File file) {
            this.summaryFile = file;
            return this;
        }

        public Builder setSummaryFrequency(long j) {
            this.summaryFrequency = j;
            return this;
        }

        public Builder setTimeout(long j) {
            this.timeout = j;
            return this;
        }

        private boolean setupCommand(HashMap<String, String> hashMap) {
            this.connectionCommands.add(hashMap);
            return true;
        }
    }

    /* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/ReliableManager$Creator.class */
    public static class Creator implements IResourceObjectCreator<ReliableManager> {
        private final String type;

        public Creator() {
            this("tinydist.reliable");
        }

        public Creator(String str) {
            this.type = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public ReliableManager create(ParameterizedExperiment.Parameters parameters, IResourceRepository iResourceRepository) {
            Builder builder = new Builder(new LoggingThreadFactory("tinydist"));
            if (parameters.contains("summary")) {
                builder.setSummaryFile(parameters.getAsFile("summary"));
            }
            if (parameters.contains("summaryFreq")) {
                builder.setSummaryFrequency(parameters.getAsLong("summaryFreq"));
            }
            if (parameters.contains(DistributionConstants.PORT)) {
                builder.setPort(parameters.getAsInteger(DistributionConstants.PORT));
            }
            if (parameters.contains("pingFreq")) {
                builder.setPingFrequency(parameters.getAsInteger("pingFreq"));
            }
            if (parameters.contains("timeout")) {
                builder.setTimeout(parameters.getAsInteger("pingTimeout"));
            }
            if (parameters.contains("config")) {
                try {
                    builder.configureFromFile(parameters.getAsFile("config"));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return builder.build();
        }

        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public String type() {
            return this.type;
        }

        @Override // edu.cornell.cs.nlp.spf.explat.resources.IResourceObjectCreator
        public ResourceUsage usage() {
            return ResourceUsage.builder(this.type, ReliableManager.class).setDescription("Reliable manager for TinyDist").addParam("summary", File.class, "File to dump the summary (default: none)").addParam("summaryFreq", Long.class, "Summary dump frequency (default: 20sec)").addParam(DistributionConstants.PORT, Integer.class, "Incoming connection port").addParam("pingFreq", Integer.class, "Ping fequency (default: 20000)").addParam("timeout", Integer.class, "Connection timeout (default: 200000)").addParam("config", File.class, "Configuration file").build();
        }
    }

    /* loaded from: input_file:edu/cornell/cs/nlp/spf/reliabledist/ReliableManager$RegisterThread.class */
    private class RegisterThread implements Runnable {
        private RegisterThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ServerSocket serverSocket;
            Throwable th;
            Throwable th2;
            while (true) {
                try {
                    serverSocket = new ServerSocket(ReliableManager.this.registerPort);
                    th = null;
                } catch (Exception e) {
                    ReliableManager.LOG.error("Failed to create connection and start worker thread: %s", (Throwable) e);
                    ReliableManager.LOG.error("Restarting listening thread");
                }
                try {
                    try {
                        EnslavedRemoteManager enslavedRemoteManager = new EnslavedRemoteManager(serverSocket.accept(), ReliableManager.this.pingFrequency, ReliableManager.this.connectionTimeout, ReliableManager.this, ReliableManager.this.managerIdGenerator.getAndIncrement());
                        ReliableManager.this.threadFactory.newThread(enslavedRemoteManager).start();
                        ReliableManager.LOG.info("Starting new manager: %s -> %s", Integer.valueOf(enslavedRemoteManager.getId()), enslavedRemoteManager.getName());
                        ReliableManager.this.registerManager(enslavedRemoteManager);
                        if (serverSocket != null) {
                            if (0 != 0) {
                                try {
                                    serverSocket.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                serverSocket.close();
                            }
                        }
                    } finally {
                        if (serverSocket == null) {
                            break;
                        } else if (th2 == null) {
                            break;
                        } else {
                            try {
                                break;
                            } catch (Throwable th4) {
                            }
                        }
                    }
                } catch (Throwable th5) {
                    th = th5;
                    throw th5;
                    break;
                }
            }
        }
    }

    public ReliableManager(int i, List<Map<String, String>> list, long j, long j2, ThreadFactory threadFactory, File file, long j3) {
        this.registerPort = i;
        this.connectionCommands = list;
        this.pingFrequency = j;
        this.connectionTimeout = j2;
        this.threadFactory = threadFactory;
        this.summaryFile = file;
        this.summaryFrequency = j3;
        this.register = threadFactory.newThread(new RegisterThread());
        this.mythread = threadFactory.newThread(this);
        LOG.info("Init %s: summaryFile=%s ...", getClass(), file);
        LOG.info("Init %s: summaryFrequency=%d ...", getClass(), Long.valueOf(j3));
        LOG.info("Init %s: registerPort=%d ...", getClass(), Integer.valueOf(i));
        LOG.info("Init %s: pingFrequency=%d ...", getClass(), Long.valueOf(j));
        LOG.info("Init %s: pingTimeout=%d", getClass(), Long.valueOf(j2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<Pair<String, String>> readConfigurationCommands(File file) throws FileNotFoundException, IOException {
        LinkedList linkedList = new LinkedList();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
        Throwable th = null;
        while (true) {
            try {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    String[] split = readLine.split("\t", 2);
                    linkedList.add(Pair.of(split[0], split[1]));
                } finally {
                }
            } catch (Throwable th2) {
                if (bufferedReader != null) {
                    if (th != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th2;
            }
        }
        if (bufferedReader != null) {
            if (0 != 0) {
                try {
                    bufferedReader.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                bufferedReader.close();
            }
        }
        return linkedList;
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        this.terminationLock.wait(timeUnit.toMillis(j));
        return isTerminated();
    }

    public boolean canCreateBoundary() {
        return getRemainingOutstandingTasks() == 0;
    }

    public synchronized boolean createBoundary() {
        if (!canCreateBoundary()) {
            return false;
        }
        synchronized (this.environmentLock) {
            this.currentEnviroment = null;
            this.environmentUpdateIdGenerator.set(0);
        }
        this.taskWorker.clear();
        return true;
    }

    public <ENV extends AbstractEnvironment, OUTPUT> JobFuture<OUTPUT> execute(Function<ENV, OUTPUT> function) {
        if (!(function instanceof Serializable)) {
            throw new IllegalArgumentException("Class not serializable: " + function.getClass().getName());
        }
        Task task = new Task(function, this.taskIdGenerator.getAndIncrement());
        JobFuture<OUTPUT> jobFuture = new JobFuture<>();
        synchronized (this.futures) {
            this.futures.put(Long.valueOf(task.getId()), jobFuture);
        }
        execute(task);
        return jobFuture;
    }

    public boolean existsFree() {
        Iterator<EnslavedRemoteManager> it2 = this.managers.iterator();
        while (it2.hasNext()) {
            if (it2.next().existsFree()) {
                return true;
            }
        }
        return false;
    }

    public List<Task> getDoneTasks() {
        return Collections.unmodifiableList(this.completed);
    }

    public <ENV extends AbstractEnvironment> ENV getEnviroment() {
        return (ENV) this.currentEnviroment;
    }

    public int getRemainingOutstandingTasks() {
        int size;
        synchronized (this) {
            size = this.queuedTasks.size() + numRunning();
        }
        return size;
    }

    public synchronized ManagerSummary getSummary() {
        ManagerSummary.Builder builder = new ManagerSummary.Builder();
        Iterator<EnslavedRemoteManager> it2 = this.managers.iterator();
        while (it2.hasNext()) {
            builder.addWorker(it2.next().getSummary());
        }
        builder.setFailedWorkers(this.nonworkingManager.size());
        builder.setCompletedTasks(this.totalCompletedTask.get());
        builder.setRedoneTasks(this.totalRedone.get());
        return builder.build();
    }

    public boolean isRunning() {
        return this.mythread.isAlive();
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean isShutdown() {
        return isRunning();
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean isTerminated() {
        return !this.isRunning;
    }

    public int numRunning() {
        int i;
        synchronized (this) {
            int i2 = 0;
            Iterator<List<Task>> it2 = this.runningTasks.values().iterator();
            while (it2.hasNext()) {
                i2 += it2.next().size();
            }
            i = i2;
        }
        return i;
    }

    public boolean reportResult(ITaskExecutor iTaskExecutor, Task task, TaskResult taskResult) {
        synchronized (this) {
            if (!this.managers.contains(iTaskExecutor)) {
                return false;
            }
            if (!this.runningTasks.get(iTaskExecutor).remove(task)) {
                return false;
            }
            synchronized (this.futures) {
                JobFuture<?> jobFuture = this.futures.get(Long.valueOf(task.getId()));
                if (jobFuture == null) {
                    throw new IllegalStateException("Future is missing");
                }
                this.futures.remove(Long.valueOf(task.getId()));
                jobFuture.setResult(iTaskExecutor, taskResult);
            }
            this.totalCompletedTask.getAndIncrement();
            return true;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        long j = 0;
        while (this.isRunning) {
            try {
                synchronized (this) {
                    Iterator<EnslavedRemoteManager> it2 = this.managers.iterator();
                    while (it2.hasNext()) {
                        EnslavedRemoteManager next = it2.next();
                        if (!next.isRunning()) {
                            LOG.info("Manager failed: [%d] %s", Integer.valueOf(next.getId()), next.getName());
                            LOG.info("Resubmitting %d tasks...", Integer.valueOf(this.runningTasks.get(next).size()));
                            for (Task task : this.runningTasks.get(next)) {
                                LOG.info("Resubmitted %s, %d", task.getClass().getName(), Long.valueOf(task.getId()));
                                execute(task);
                                this.totalRedone.getAndIncrement();
                            }
                            this.nonworkingManager.add(next);
                            it2.remove();
                            this.runningTasks.remove(next);
                        }
                    }
                    Iterator it3 = ((List) this.managers.stream().map(enslavedRemoteManager -> {
                        return Pair.of(enslavedRemoteManager, Double.valueOf(enslavedRemoteManager.getExecutionTimeAverage()));
                    }).sorted((pair, pair2) -> {
                        return Double.compare(((Double) pair.second()).doubleValue(), ((Double) pair2.second()).doubleValue());
                    }).map(pair3 -> {
                        return (EnslavedRemoteManager) pair3.first();
                    }).collect(Collectors.toList())).iterator();
                    while (true) {
                        if (!it3.hasNext()) {
                            break;
                        }
                        EnslavedRemoteManager enslavedRemoteManager2 = (EnslavedRemoteManager) it3.next();
                        if (enslavedRemoteManager2.existsFree() && this.queuedTasks.size() > 0) {
                            Task peek = this.queuedTasks.peek();
                            if (!enslavedRemoteManager2.execute(peek)) {
                                LOG.info("Manager %d refused task %d", Integer.valueOf(enslavedRemoteManager2.getId()), Long.valueOf(peek.getId()));
                                break;
                            } else {
                                this.queuedTasks.poll();
                                this.runningTasks.get(enslavedRemoteManager2).add(peek);
                                this.taskWorker.put(peek, enslavedRemoteManager2);
                            }
                        }
                    }
                    if (this.summaryFile != null && System.currentTimeMillis() - j > this.summaryFrequency) {
                        FileOutputStream fileOutputStream = new FileOutputStream(this.summaryFile);
                        Throwable th = null;
                        try {
                            try {
                                PrintStream printStream = new PrintStream(fileOutputStream);
                                Throwable th2 = null;
                                try {
                                    try {
                                        printStream.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
                                        printStream.print(getSummary());
                                        if (printStream != null) {
                                            if (0 != 0) {
                                                try {
                                                    printStream.close();
                                                } catch (Throwable th3) {
                                                    th2.addSuppressed(th3);
                                                }
                                            } else {
                                                printStream.close();
                                            }
                                        }
                                        if (fileOutputStream != null) {
                                            if (0 != 0) {
                                                try {
                                                    fileOutputStream.close();
                                                } catch (Throwable th4) {
                                                    th.addSuppressed(th4);
                                                }
                                            } else {
                                                fileOutputStream.close();
                                            }
                                        }
                                        j = System.currentTimeMillis();
                                    } catch (Throwable th5) {
                                        th2 = th5;
                                        throw th5;
                                        break;
                                    }
                                } catch (Throwable th6) {
                                    if (printStream != null) {
                                        if (th2 != null) {
                                            try {
                                                printStream.close();
                                            } catch (Throwable th7) {
                                                th2.addSuppressed(th7);
                                            }
                                        } else {
                                            printStream.close();
                                        }
                                    }
                                    throw th6;
                                    break;
                                }
                            } catch (Throwable th8) {
                                th = th8;
                                throw th8;
                                break;
                            }
                        } catch (Throwable th9) {
                            if (fileOutputStream != null) {
                                if (th != null) {
                                    try {
                                        fileOutputStream.close();
                                    } catch (Throwable th10) {
                                        th.addSuppressed(th10);
                                    }
                                } else {
                                    fileOutputStream.close();
                                }
                            }
                            throw th9;
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                LOG.error("Exception from main loop: %s", (Throwable) e);
            }
        }
    }

    public boolean setupCommand(Map<String, String> map) {
        this.connectionCommands.add(map);
        return true;
    }

    public boolean setupEnviroment(AbstractEnvironment abstractEnvironment) {
        if (!$assertionsDisabled && abstractEnvironment == null) {
            throw new AssertionError();
        }
        synchronized (this) {
            if (!canCreateBoundary()) {
                return false;
            }
            boolean z = true;
            synchronized (this.environmentLock) {
                this.currentEnviroment = abstractEnvironment;
                this.environmentUpdateIdGenerator.set(0);
                Iterator<EnslavedRemoteManager> it2 = this.managers.iterator();
                while (it2.hasNext()) {
                    if (!it2.next().setupEnviroment(this.currentEnviroment)) {
                        z = false;
                    }
                }
            }
            return z;
        }
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public void shutdown() {
        synchronized (this) {
            this.isRunning = false;
            Iterator<EnslavedRemoteManager> it2 = this.managers.iterator();
            while (it2.hasNext()) {
                it2.next().terminate();
            }
            notifyAll();
        }
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public List<Runnable> shutdownNow() {
        LinkedList linkedList;
        shutdown();
        synchronized (this.queuedTasks) {
            linkedList = new LinkedList();
            for (Task task : this.queuedTasks) {
                linkedList.add(() -> {
                    task.execute(getEnviroment());
                });
            }
        }
        return linkedList;
    }

    public void start() {
        this.register.start();
        this.mythread.start();
    }

    public boolean updateEnviroment(EnvironmentConfig<?> environmentConfig) {
        return updateEnviroment(ListUtils.createSingletonList(environmentConfig));
    }

    public boolean updateEnviroment(List<EnvironmentConfig<?>> list) {
        synchronized (this) {
            if (!canCreateBoundary()) {
                LOG.error("Trying to modify enviroment when boundary cannot be made. ");
                return false;
            }
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<EnvironmentConfig<?>> it2 = list.iterator();
            while (it2.hasNext()) {
                try {
                    arrayList.add(new SerializedEnvironmentConfig(it2.next(), this.environmentUpdateIdGenerator.getAndIncrement()));
                } catch (IOException e) {
                    LOG.error("Failed to serialize environment update: %s", (Throwable) e);
                    throw new RuntimeException(e);
                }
            }
            boolean z = true;
            synchronized (this.environmentLock) {
                Iterator<EnslavedRemoteManager> it3 = this.managers.iterator();
                while (it3.hasNext()) {
                    if (!it3.next().updateEnviroment(Collections.unmodifiableList(arrayList))) {
                        z = false;
                    }
                }
                Iterator<EnvironmentConfig<?>> it4 = list.iterator();
                while (it4.hasNext()) {
                    this.currentEnviroment.update(it4.next());
                }
            }
            return z;
        }
    }

    private void execute(Task task) {
        synchronized (this) {
            this.queuedTasks.add(task);
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerManager(EnslavedRemoteManager enslavedRemoteManager) {
        synchronized (this) {
            this.runningTasks.put(enslavedRemoteManager, new ArrayList());
            Iterator<Map<String, String>> it2 = this.connectionCommands.iterator();
            while (it2.hasNext()) {
                enslavedRemoteManager.setupCommand(it2.next());
            }
            synchronized (this.environmentLock) {
                if (this.currentEnviroment != null) {
                    enslavedRemoteManager.setupEnviroment(this.currentEnviroment);
                }
            }
            this.managers.add(enslavedRemoteManager);
            LOG.info("Added new manager: %s -> %s", Integer.valueOf(enslavedRemoteManager.getId()), enslavedRemoteManager.getName());
        }
    }

    static {
        $assertionsDisabled = !ReliableManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.create((Class<?>) ReliableManager.class);
    }
}
