package id.xfunction.concurrent.flow;

import id.xfunction.Preconditions;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Function;

/* loaded from: input_file:id/xfunction/concurrent/flow/TransformProcessor.class */
public class TransformProcessor<T, R> extends SynchronousPublisher<R> implements Flow.Processor<T, R> {
    private Flow.Subscription subscription;
    private Function<T, Optional<R>> transformer;
    private Exception ctorStackTrace = new Exception("Original exception belongs to the processor which was created with this stack trace");

    public TransformProcessor(Function<T, Optional<R>> function) {
        this.transformer = function;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        Preconditions.isNull(this.subscription, "Already subscribed. Created from " + this.ctorStackTrace, new Object[0]);
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        try {
            try {
                this.transformer.apply(t).ifPresent(this::submit);
                this.subscription.request(1L);
            } catch (Exception e) {
                includeSource(e);
                throw e;
            }
        } catch (Throwable th) {
            this.subscription.request(1L);
            throw th;
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        includeSource(th);
        closeExceptionally(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        close();
    }

    private void includeSource(Throwable th) {
        if (Arrays.stream(th.getSuppressed()).noneMatch(th2 -> {
            return th2 == this.ctorStackTrace;
        })) {
            th.addSuppressed(this.ctorStackTrace);
        }
    }
}
