package id.xfunction.concurrent.flow;

import id.xfunction.logging.XLogger;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:id/xfunction/concurrent/flow/MergeProcessor.class */
public class MergeProcessor<T> extends SubmissionPublisher<T> {
    private AtomicInteger numOfActiveSubscriptions;
    private List<Throwable> failed;

    public MergeProcessor() {
        super(Executors.newCachedThreadPool(), 32);
        this.numOfActiveSubscriptions = new AtomicInteger();
        this.failed = new CopyOnWriteArrayList();
    }

    public Flow.Subscriber<T> newSubscriber() {
        return new Flow.Subscriber<T>() { // from class: id.xfunction.concurrent.flow.MergeProcessor.1
            private final XLogger LOGGER = XLogger.getLogger(this);
            private Flow.Subscription subscription;

            @Override // java.util.concurrent.Flow.Subscriber
            public void onSubscribe(Flow.Subscription subscription) {
                this.LOGGER.entering("onSubscribe");
                this.subscription = subscription;
                MergeProcessor.this.numOfActiveSubscriptions.incrementAndGet();
                this.subscription.request(1L);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onNext(T t) {
                this.LOGGER.entering("onNext");
                if (noSubscribers()) {
                    return;
                }
                try {
                    MergeProcessor.this.submit(t);
                } finally {
                    this.subscription.request(1L);
                }
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onError(Throwable th) {
                this.LOGGER.entering("onError");
                if (noSubscribers()) {
                    return;
                }
                MergeProcessor.this.failed.add(th);
                if (MergeProcessor.this.numOfActiveSubscriptions.decrementAndGet() == 0) {
                    MergeProcessor.this.closeExceptionally(MergeProcessor.this.aggregateExceptions());
                }
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onComplete() {
                this.LOGGER.entering("onComplete");
                if (!noSubscribers() && MergeProcessor.this.numOfActiveSubscriptions.decrementAndGet() == 0) {
                    if (MergeProcessor.this.failed.isEmpty()) {
                        MergeProcessor.this.close();
                    } else {
                        MergeProcessor.this.closeExceptionally(MergeProcessor.this.aggregateExceptions());
                    }
                }
            }

            private boolean noSubscribers() {
                if (!MergeProcessor.this.isClosed() || MergeProcessor.this.getNumberOfSubscribers() != 0) {
                    return false;
                }
                this.LOGGER.fine("No more subscribers, canceling subscription and closing processor");
                this.subscription.cancel();
                MergeProcessor.this.close();
                return true;
            }
        };
    }

    public int getNumOfActiveSubscriptions() {
        return this.numOfActiveSubscriptions.get();
    }

    private Exception aggregateExceptions() {
        Exception exc = new Exception("Some of the publishers terminated with errors (see suppressed exceptions)");
        List<Throwable> list = this.failed;
        Objects.requireNonNull(exc);
        list.forEach(exc::addSuppressed);
        return exc;
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public void close() {
        super.close();
        ExecutorService executorService = (ExecutorService) getExecutor();
        executorService.shutdown();
        try {
            executorService.awaitTermination(Integer.parseInt(System.getProperty("awaitMergeProcessorInSecs", "5")), TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
