package edu.cornell.cs.nlp.spf.explat;

import edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor;
import edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable;
import edu.cornell.cs.nlp.spf.base.concurrency.TinyExecutorService;
import edu.cornell.cs.nlp.spf.explat.resources.ResourceCreatorRepository;
import edu.cornell.cs.nlp.utils.log.ILogger;
import edu.cornell.cs.nlp.utils.log.LoggerFactory;
import edu.cornell.cs.nlp.utils.log.thread.LoggingThreadFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:edu/cornell/cs/nlp/spf/explat/DistributedExperiment.class */
public abstract class DistributedExperiment extends LoggedExperiment implements IJobListener, ITinyExecutor {
    public static final ILogger LOG = LoggerFactory.create((Class<?>) DistributedExperiment.class);
    private final Set<String> completedIds;
    private final Object completionSignalObject;
    private final TinyExecutorService executor;
    private final List<Job> jobs;
    private final Set<String> launchedIds;
    private boolean running;
    private final boolean serial;
    private final long startingTime;
    private boolean success;
    private final List<Shutdownable> toShutdown;

    public DistributedExperiment(File file, Map<String, String> map, ResourceCreatorRepository resourceCreatorRepository) throws IOException {
        super(file, map, resourceCreatorRepository);
        this.completedIds = new HashSet();
        this.completionSignalObject = new Object();
        this.jobs = new LinkedList();
        this.launchedIds = new HashSet();
        this.running = true;
        this.startingTime = System.currentTimeMillis();
        this.success = true;
        this.toShutdown = new LinkedList();
        this.serial = this.globalParams.getAsBoolean("serial");
        this.executor = new TinyExecutorService(this.globalParams.contains("expThreads") ? Integer.valueOf(this.globalParams.get("expThreads")).intValue() : Runtime.getRuntime().availableProcessors(), new LoggingThreadFactory(), this.globalParams.contains("threadMonitorPolling") ? Long.valueOf(this.globalParams.get("threadMonitorPolling")).longValue() : 1000L);
    }

    public DistributedExperiment(File file, ResourceCreatorRepository resourceCreatorRepository) throws IOException {
        this(file, Collections.emptyMap(), resourceCreatorRepository);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.executor.awaitTermination(j, timeUnit);
    }

    @Override // edu.cornell.cs.nlp.spf.explat.LoggedExperiment
    public void end() {
        for (Shutdownable shutdownable : this.toShutdown) {
            LOG.info("Shutting down %s", shutdownable.getClass().getName());
            shutdownable.shutdown();
        }
        this.executor.shutdown();
        super.end();
    }

    @Override // java.util.concurrent.Executor
    public void execute(Runnable runnable) {
        this.executor.execute(runnable);
    }

    public ITinyExecutor getExecutor() {
        return this.executor;
    }

    public ExecutorService getExecutorService() {
        return this.executor;
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) throws InterruptedException {
        return this.executor.invokeAll(collection);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) throws InterruptedException {
        return this.executor.invokeAll(collection, j, timeUnit);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor
    public <T> List<Future<T>> invokeAllWithUniqueTimeout(Collection<? extends Callable<T>> collection, long j) throws InterruptedException {
        return this.executor.invokeAllWithUniqueTimeout(collection, j);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean isShutdown() {
        return this.executor.isShutdown();
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public boolean isTerminated() {
        return this.executor.isTerminated();
    }

    @Override // edu.cornell.cs.nlp.spf.explat.IJobListener
    public void jobCompleted(Job job) {
        boolean z = true;
        synchronized (this.jobs) {
            this.completedIds.add(job.getId());
            if (this.running) {
                for (Job job2 : this.jobs) {
                    z &= job2.isCompleted();
                    if (!this.launchedIds.contains(job2.getId()) && this.completedIds.containsAll(job2.getDependencyIds())) {
                        this.executor.execute(job2);
                        this.launchedIds.add(job2.getId());
                        if (this.serial) {
                            break;
                        }
                    }
                }
            }
            if (z) {
                this.running = false;
            }
        }
        if (z || !this.running) {
            synchronized (this.completionSignalObject) {
                this.completionSignalObject.notifyAll();
            }
        }
    }

    @Override // edu.cornell.cs.nlp.spf.explat.IJobListener
    public void jobException(Job job, Throwable th) {
        synchronized (this.jobs) {
            this.running = false;
        }
        LOG.error("Job %s threw an exception: %s", job.getId(), th.getMessage());
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        LOG.error(stringWriter.toString());
        jobCompleted(job);
        this.success = false;
    }

    @Override // edu.cornell.cs.nlp.spf.explat.ParameterizedExperiment
    public void readResrouces() {
        try {
            super.readResrouces();
        } catch (RuntimeException e) {
            end();
            throw e;
        }
    }

    public void registerForShutdown(Shutdownable shutdownable) {
        this.toShutdown.add(shutdownable);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public void shutdown() {
        this.executor.shutdown();
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.Shutdownable
    public List<Runnable> shutdownNow() {
        return this.executor.shutdownNow();
    }

    public void start() {
        synchronized (this.completionSignalObject) {
            boolean z = false;
            synchronized (this.jobs) {
                for (Job job : this.jobs) {
                    if (job.getDependencyIds().isEmpty()) {
                        z = true;
                        this.executor.execute(job);
                        this.launchedIds.add(job.getId());
                        if (this.serial) {
                            break;
                        }
                    }
                }
            }
            if (z) {
                LOG.info("Started jobs");
                try {
                    this.completionSignalObject.wait();
                    LOG.info("Jobs completed - ending experiment");
                    end();
                    LOG.info("Experiment completed");
                    LOG.info("Total run time %.4f seconds", Double.valueOf((System.currentTimeMillis() - this.startingTime) / 1000.0d));
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                LOG.info("No jobs started");
            }
        }
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor
    public <T> Future<T> submit(Callable<T> callable) {
        return this.executor.submit(callable);
    }

    @Override // edu.cornell.cs.nlp.spf.base.concurrency.ITinyExecutor
    public <T> Future<T> submit(Callable<T> callable, long j) {
        return this.executor.submit(callable, j);
    }

    public boolean success() {
        return this.success;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addJob(Job job) {
        this.jobs.add(job);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File createJobLogFile(String str) {
        return new File(this.outputDir, String.format("%s.log", str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File createJobOutputFile(String str) {
        return new File(this.outputDir, String.format("%s.out", str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // edu.cornell.cs.nlp.spf.explat.ParameterizedExperiment
    public <T> void storeResource(String str, T t) {
        if (t instanceof Shutdownable) {
            registerForShutdown((Shutdownable) t);
        }
        super.storeResource(str, t);
    }
}
