package pinorobotics.rtpstalk.impl.spec.transport;

import id.xfunction.Preconditions;
import id.xfunction.concurrent.NamedThreadFactory;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/transport/RtpsMessageReceiver.class */
public class RtpsMessageReceiver extends SubmissionPublisher<RtpsMessage> implements AutoCloseable {
    private final XLogger logger;
    private ExecutorService executor;
    private boolean isStarted;
    private boolean isClosed;
    private DataChannel dataChannel;

    /* JADX INFO: Access modifiers changed from: protected */
    public RtpsMessageReceiver(RtpsTalkConfiguration rtpsTalkConfiguration, TracingToken tracingToken, Executor executor) {
        super(executor, rtpsTalkConfiguration.publisherMaxBufferSize());
        this.executor = Executors.newSingleThreadExecutor(new NamedThreadFactory(tracingToken.toString()));
        this.logger = XLogger.getLogger(getClass(), tracingToken);
    }

    public void start(DataChannel dataChannel) throws IOException {
        this.dataChannel = dataChannel;
        this.logger.entering("start");
        Preconditions.isTrue(!this.isStarted, "Already started");
        this.executor.execute(() -> {
            Thread currentThread = Thread.currentThread();
            this.logger.fine("Running {0} on thread {1} with id {2}", new Object[]{getClass().getSimpleName(), currentThread.getName(), Long.valueOf(currentThread.getId())});
            while (!this.executor.isShutdown()) {
                try {
                    RtpsMessage receive = dataChannel.receive();
                    this.logger.fine("Incoming RTPS message {0}", new Object[]{receive});
                    if (!isClosed()) {
                        submit(receive);
                    }
                } catch (AsynchronousCloseException e) {
                    if (!this.isClosed) {
                        this.logger.severe(e);
                    }
                } catch (Exception e2) {
                    this.logger.severe(e2);
                }
            }
            this.logger.fine("Shutdown received, stopping...");
        });
        this.isStarted = true;
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super RtpsMessage> subscriber) {
        this.logger.fine("Subscribing {0}", new Object[]{subscriber});
        super.subscribe(subscriber);
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public void close() {
        if (this.isStarted && !this.isClosed) {
            this.isClosed = true;
            this.logger.fine("Closing executor");
            this.executor.shutdown();
            this.logger.fine("Closing data channel");
            this.dataChannel.close();
            super.close();
            this.logger.fine("Closed");
        }
    }
}
