package pinorobotics.rtpstalk.impl;

import id.xfunction.Preconditions;
import id.xfunction.lang.XRE;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.impl.qos.ReaderQosPolicySet;
import pinorobotics.rtpstalk.impl.qos.WriterQosPolicySet;
import pinorobotics.rtpstalk.impl.spec.discovery.sedp.MetatrafficUnicastService;
import pinorobotics.rtpstalk.impl.spec.discovery.spdp.MetatrafficMulticastService;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannelFactory;
import pinorobotics.rtpstalk.impl.spec.transport.RtpsMessageReceiverFactory;
import pinorobotics.rtpstalk.impl.spec.userdata.DataObjectsFactory;
import pinorobotics.rtpstalk.impl.spec.userdata.UserDataService;
import pinorobotics.rtpstalk.impl.topics.TopicPublicationsManager;
import pinorobotics.rtpstalk.impl.topics.TopicSubscriptionsManager;
import pinorobotics.rtpstalk.messages.RtpsTalkDataMessage;
import pinorobotics.rtpstalk.qos.PublisherQosPolicy;
import pinorobotics.rtpstalk.qos.SubscriberQosPolicy;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/RtpsServiceManager.class */
public class RtpsServiceManager implements AutoCloseable {
    private RtpsTalkConfigurationInternal config;
    private boolean isStarted;
    private DataChannelFactory channelFactory;
    private List<MetatrafficMulticastService> spdpServices = new ArrayList();
    private MetatrafficUnicastService sedpService;
    private UserDataService userService;
    private TopicSubscriptionsManager subscriptionsManager;
    private TopicPublicationsManager publicationsManager;
    private XLogger logger;
    private RtpsMessageReceiverFactory receiverFactory;
    private RtpsNetworkInterfaceFactory networkIfaceFactory;
    private ExecutorService publisherExecutor;

    public RtpsServiceManager(RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, DataChannelFactory dataChannelFactory, RtpsMessageReceiverFactory rtpsMessageReceiverFactory) {
        this.config = rtpsTalkConfigurationInternal;
        this.channelFactory = dataChannelFactory;
        this.receiverFactory = rtpsMessageReceiverFactory;
        this.networkIfaceFactory = new RtpsNetworkInterfaceFactory(rtpsTalkConfigurationInternal.publicConfig(), dataChannelFactory);
    }

    public void startAll(TracingToken tracingToken) {
        Preconditions.isTrue(!this.isStarted, "All services already started");
        this.logger = XLogger.getLogger(getClass(), tracingToken);
        this.logger.entering("start");
        this.logger.fine("Using following configuration: {0}", new Object[]{this.config});
        this.publisherExecutor = this.config.publicConfig().publisherExecutor().orElseGet(RtpsTalkConfiguration.Builder.DEFAULT_PUBLISHER_EXECUTOR);
        try {
            RtpsNetworkInterface createRtpsNetworkInterface = this.networkIfaceFactory.createRtpsNetworkInterface(tracingToken);
            this.sedpService = new MetatrafficUnicastService(this.config, this.publisherExecutor, this.channelFactory, this.receiverFactory);
            this.userService = new UserDataService(this.config, this.publisherExecutor, this.channelFactory, new DataObjectsFactory(), this.receiverFactory);
            this.sedpService.start(tracingToken, createRtpsNetworkInterface);
            this.userService.start(tracingToken, createRtpsNetworkInterface);
            this.subscriptionsManager = new TopicSubscriptionsManager(tracingToken, this.config, createRtpsNetworkInterface, this.sedpService.getSubscriptionsWriter(), this.userService);
            this.sedpService.getPublicationsReader().subscribe(this.subscriptionsManager);
            this.publicationsManager = new TopicPublicationsManager(tracingToken, this.config, createRtpsNetworkInterface, this.sedpService.getPublicationsWriter(), this.userService);
            this.sedpService.getSubscriptionsReader().subscribe(this.publicationsManager);
            startSpdp(tracingToken, createRtpsNetworkInterface);
        } catch (Exception e) {
            this.logger.severe("Failed to start one of the RTPS services", e);
        }
        this.isStarted = true;
    }

    private void startSpdp(TracingToken tracingToken, RtpsNetworkInterface rtpsNetworkInterface) throws Exception {
        List<NetworkInterface> list = (List) this.config.publicConfig().networkInterface().map((v0) -> {
            return List.of(v0);
        }).orElseGet(() -> {
            return InternalUtils.getInstance().listAllNetworkInterfaces();
        });
        Preconditions.isTrue(!list.isEmpty(), "No network interfaces found");
        this.logger.fine("Starting SPDP on following network interfaces {0}", new Object[]{list});
        for (NetworkInterface networkInterface : list) {
            try {
                MetatrafficMulticastService metatrafficMulticastService = new MetatrafficMulticastService(this.config, this.publisherExecutor, this.channelFactory, this.receiverFactory);
                metatrafficMulticastService.start(tracingToken, rtpsNetworkInterface, networkInterface, this.sedpService.newSedpConfigurator());
                this.spdpServices.add(metatrafficMulticastService);
            } catch (Exception e) {
                this.logger.severe("Error starting SPDP on network interface " + networkInterface, e);
            }
        }
    }

    public EntityId subscribe(String str, String str2, SubscriberQosPolicy subscriberQosPolicy, Flow.Subscriber<RtpsTalkDataMessage> subscriber) {
        return this.subscriptionsManager.addLocalActor(new SubscriberDetails(new TopicId(str, str2), new ReaderQosPolicySet(subscriberQosPolicy), subscriber));
    }

    public void publish(String str, String str2, PublisherQosPolicy publisherQosPolicy, Flow.Publisher<RtpsTalkDataMessage> publisher) {
        this.publicationsManager.addLocalActor(new PublisherDetails(new TopicId(str, str2), new WriterQosPolicySet(publisherQosPolicy), publisher));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isStarted) {
            this.logger.fine("Closing");
            this.publicationsManager.close();
            this.subscriptionsManager.close();
            this.userService.closeDataWriters();
            this.sedpService.close();
            this.spdpServices.forEach((v0) -> {
                v0.close();
            });
            this.userService.close();
            if (this.config.publicConfig().publisherExecutor().isEmpty()) {
                this.logger.fine("Closing publisherExecutor");
                this.publisherExecutor.shutdown();
                try {
                    if (!this.publisherExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
                        throw new XRE("Timeout waiting publisher executor service to shutdown");
                    }
                } catch (InterruptedException e) {
                    this.logger.severe("Error on close", e);
                }
            } else {
                this.logger.fine("Not closing publisherExecutor as it is managed by the user");
            }
            this.logger.fine("Closed");
        }
    }
}
