package pinorobotics.rtpstalk.impl.spec.userdata;

import id.xfunction.Preconditions;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.function.Predicate;
import pinorobotics.rtpstalk.impl.PublisherDetails;
import pinorobotics.rtpstalk.impl.RtpsNetworkInterface;
import pinorobotics.rtpstalk.impl.RtpsTalkConfigurationInternal;
import pinorobotics.rtpstalk.impl.spec.behavior.OperatingEntities;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.Locator;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannelFactory;
import pinorobotics.rtpstalk.impl.spec.transport.RtpsMessageReceiver;
import pinorobotics.rtpstalk.impl.spec.transport.RtpsMessageReceiverFactory;
import pinorobotics.rtpstalk.messages.RtpsTalkDataMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/userdata/UserDataService.class */
public class UserDataService implements AutoCloseable {
    private XLogger logger;
    private RtpsTalkConfigurationInternal config;
    private RtpsMessageReceiver receiver;
    private DataChannelFactory channelFactory;
    private Map<EntityId, DataReader> readers = new HashMap();
    private Map<EntityId, DataWriter> writers = new HashMap();
    private boolean isStarted;
    private OperatingEntities operatingEntities;
    private TracingToken tracingToken;
    private DataObjectsFactory dataObjectsFactory;
    private RtpsMessageReceiverFactory receiverFactory;
    private Executor publisherExecutor;

    public UserDataService(RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, Executor executor, DataChannelFactory dataChannelFactory, DataObjectsFactory dataObjectsFactory, RtpsMessageReceiverFactory rtpsMessageReceiverFactory) {
        this.config = rtpsTalkConfigurationInternal;
        this.publisherExecutor = executor;
        this.channelFactory = dataChannelFactory;
        this.dataObjectsFactory = dataObjectsFactory;
        this.receiverFactory = rtpsMessageReceiverFactory;
    }

    public void subscribeToRemoteWriter(EntityId entityId, List<Locator> list, Guid guid, Flow.Subscriber<RtpsTalkDataMessage> subscriber) {
        Preconditions.isTrue(this.isStarted, "User data service is not started");
        DataReader computeIfAbsent = this.readers.computeIfAbsent(entityId, entityId2 -> {
            return this.dataObjectsFactory.newDataReader(this.config.publicConfig(), this.tracingToken, this.publisherExecutor, this.operatingEntities, entityId2);
        });
        computeIfAbsent.matchedWriterAdd(guid, list);
        if (!computeIfAbsent.isSubscribed(subscriber)) {
            computeIfAbsent.subscribe(subscriber);
        }
        if (this.receiver.isSubscribed(computeIfAbsent)) {
            return;
        }
        this.receiver.subscribe(computeIfAbsent);
    }

    public void publish(EntityId entityId, EntityId entityId2, PublisherDetails publisherDetails) {
        Preconditions.isTrue(this.isStarted, "User data service is not started");
        Preconditions.isTrue(!this.writers.containsKey(entityId), "Publisher for entity id %s already exist", new Object[]{entityId});
        DataWriter newDataWriter = this.dataObjectsFactory.newDataWriter(this.config, this.tracingToken, this.publisherExecutor, this.channelFactory, this.operatingEntities, entityId, publisherDetails.qosPolicy());
        this.writers.put(entityId, newDataWriter);
        publisherDetails.publisher().subscribe(newDataWriter);
        this.receiver.subscribe(newDataWriter.getWriterReader());
    }

    public void start(TracingToken tracingToken, RtpsNetworkInterface rtpsNetworkInterface) throws IOException {
        Preconditions.isTrue(!this.isStarted, "Already started");
        this.tracingToken = tracingToken;
        this.logger = XLogger.getLogger(getClass(), this.tracingToken);
        this.receiver = this.receiverFactory.newRtpsMessageReceiver(this.config.publicConfig(), new TracingToken(this.tracingToken, new String[]{"UserDataServiceReceiver"}), this.publisherExecutor);
        this.logger.entering("start");
        this.logger.fine("Starting user service on {0}", new Object[]{rtpsNetworkInterface.getLocalDefaultUnicastLocator()});
        this.receiver.start(rtpsNetworkInterface.getDefaultUnicastChannel());
        this.operatingEntities = rtpsNetworkInterface.getOperatingEntities();
        this.isStarted = true;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isStarted) {
            this.logger.fine("Closing");
            closeDataWriters();
            this.receiver.close();
            this.readers.values().forEach((v0) -> {
                v0.close();
            });
        }
    }

    public void closeDataWriters() {
        Predicate<? super DataWriter> predicate = dataWriter -> {
            return dataWriter.getGuid().entityId.isBuiltin();
        };
        this.logger.fine("Closing non builtin writers");
        this.writers.values().stream().filter(predicate.negate()).forEach((v0) -> {
            v0.close();
        });
        this.logger.fine("Closing builtin writers");
        this.writers.values().stream().filter(predicate).forEach((v0) -> {
            v0.close();
        });
    }
}
