package pinorobotics.rtpstalk.impl.topics;

import id.xfunction.Preconditions;
import id.xfunction.lang.XThread;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import pinorobotics.rtpstalk.impl.PublisherDetails;
import pinorobotics.rtpstalk.impl.RtpsNetworkInterface;
import pinorobotics.rtpstalk.impl.RtpsTalkConfigurationInternal;
import pinorobotics.rtpstalk.impl.RtpsTalkParameterListMessage;
import pinorobotics.rtpstalk.impl.TopicId;
import pinorobotics.rtpstalk.impl.spec.behavior.OperatingEntities;
import pinorobotics.rtpstalk.impl.spec.behavior.writer.ReaderProxy;
import pinorobotics.rtpstalk.impl.spec.behavior.writer.StatefullReliableRtpsWriter;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityKind;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ParameterList;
import pinorobotics.rtpstalk.impl.spec.userdata.UserDataService;
import pinorobotics.rtpstalk.impl.topics.ActorDetails;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/topics/TopicPublicationsManager.class */
public class TopicPublicationsManager extends AbstractTopicManager<PublisherDetails> {
    private XLogger logger;
    private SedpDataFactory dataFactory;
    private RtpsNetworkInterface networkIface;
    private UserDataService userService;
    private OperatingEntities operatingEntities;
    private TracingToken tracingToken;
    private Map<TopicId, Long> announcementSeqNums;
    private RtpsTalkConfigurationInternal config;

    public TopicPublicationsManager(TracingToken tracingToken, RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, RtpsNetworkInterface rtpsNetworkInterface, StatefullReliableRtpsWriter<RtpsTalkParameterListMessage> statefullReliableRtpsWriter, UserDataService userDataService) {
        super(tracingToken, statefullReliableRtpsWriter, rtpsNetworkInterface.getParticipantsRegistry(), ActorDetails.Type.Publisher);
        this.announcementSeqNums = new HashMap();
        this.tracingToken = tracingToken;
        this.config = rtpsTalkConfigurationInternal;
        this.dataFactory = new SedpDataFactory(rtpsTalkConfigurationInternal);
        this.networkIface = rtpsNetworkInterface;
        this.operatingEntities = rtpsNetworkInterface.getOperatingEntities();
        this.userService = userDataService;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
    }

    @Override // pinorobotics.rtpstalk.impl.topics.AbstractTopicManager
    public EntityId addLocalActor(PublisherDetails publisherDetails) {
        TopicId topicId = publisherDetails.topicId();
        Preconditions.isTrue(!((Boolean) findTopicById(topicId).map((v0) -> {
            return v0.hasLocalActors();
        }).orElse(false)).booleanValue(), "Only one local writer per topic %s is allowed", new Object[]{topicId});
        Topic<PublisherDetails> createTopic = createTopic(topicId);
        this.userService.publish(createTopic.getLocalTopicEntityId(), this.operatingEntities.getReaders().assignNewEntityId(topicId, EntityKind.READER_NO_KEY), publisherDetails);
        EntityId addLocalActor = super.addLocalActor((TopicPublicationsManager) publisherDetails);
        Preconditions.equals(addLocalActor, createTopic.getLocalTopicEntityId(), "Same local topic with different entity ids");
        return addLocalActor;
    }

    @Override // pinorobotics.rtpstalk.impl.topics.AbstractTopicManager
    protected Consumer<TopicMatchEvent<PublisherDetails>> createListener(Topic<PublisherDetails> topic) {
        return topicMatchEvent -> {
            TopicId topicId = topic.getTopicId();
            RemoteActorDetails remoteActor = topicMatchEvent.remoteActor();
            this.logger.fine("New match event between local publisher and remote subscriber {0} for topic id {1}", new Object[]{remoteActor, topicId});
            StatefullReliableRtpsWriter<?> orCreateWriter = getOrCreateWriter(topic, (PublisherDetails) topicMatchEvent.localActor());
            waitForTopicBeAnnouncedToReader(remoteActor.endpointGuid().guidPrefix, topic.getTopicId()).thenRun(() -> {
                try {
                    orCreateWriter.matchedReaderAdd(remoteActor.endpointGuid(), remoteActor.writerUnicastLocator(), remoteActor.reliabilityKind());
                } catch (IOException e) {
                    this.logger.severe(e);
                }
            });
        };
    }

    private StatefullReliableRtpsWriter<?> getOrCreateWriter(Topic<PublisherDetails> topic, PublisherDetails publisherDetails) {
        TopicId topicId = topic.getTopicId();
        return this.operatingEntities.getWriters().findEntity(topicId).or(() -> {
            this.userService.publish(topic.getLocalTopicEntityId(), this.operatingEntities.getReaders().assignNewEntityId(topicId, EntityKind.READER_NO_KEY), publisherDetails);
            return this.operatingEntities.getWriters().findEntity(topicId);
        }).orElseThrow(() -> {
            return new RuntimeException("Could not register local data writer for topic " + topic);
        });
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [pinorobotics.rtpstalk.impl.topics.TopicPublicationsManager$1] */
    private CompletionStage<Void> waitForTopicBeAnnouncedToReader(GuidPrefix guidPrefix, final TopicId topicId) {
        final CompletableFuture completableFuture = new CompletableFuture();
        final Long l = this.announcementSeqNums.get(topicId);
        if (l == null) {
            completableFuture.completeExceptionally(new RuntimeException("Cannot find topic " + topicId + " announcement sequence number"));
            return completableFuture;
        }
        final Guid guid = new Guid(guidPrefix, EntityId.Predefined.ENTITYID_SEDP_BUILTIN_PUBLICATIONS_DETECTOR);
        final ReaderProxy orElse = this.announcementsWriter.matchedReaderLookup(guid).orElse(null);
        if (orElse == null) {
            completableFuture.completeExceptionally(new RuntimeException("Reader " + guid + " is not known to SEDP"));
            return completableFuture;
        }
        new Thread(this.tracingToken.toString()) { // from class: pinorobotics.rtpstalk.impl.topics.TopicPublicationsManager.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int i = 5;
                while (true) {
                    if (orElse.getHighestAckedSeqNum() >= l.longValue()) {
                        break;
                    }
                    TopicPublicationsManager.this.logger.fine("Waiting for topic {0} to be annouced to the reader {1}, current retry {2}", new Object[]{topicId, guid, Integer.valueOf(i)});
                    XThread.sleep(TopicPublicationsManager.this.config.publicConfig().heartbeatPeriod().toMillis());
                    i--;
                    if (i < 0) {
                        completableFuture.completeExceptionally(new RuntimeException("Topic " + topicId + " was not announced to the Reader " + guid + " in time"));
                        break;
                    }
                }
                completableFuture.complete(null);
            }
        }.start();
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pinorobotics.rtpstalk.impl.topics.AbstractTopicManager
    public long announceTopicInterest(PublisherDetails publisherDetails, Topic<PublisherDetails> topic) {
        long announceTopicInterest = super.announceTopicInterest((TopicPublicationsManager) publisherDetails, (Topic<TopicPublicationsManager>) topic);
        this.announcementSeqNums.put(topic.getTopicId(), Long.valueOf(announceTopicInterest));
        return announceTopicInterest;
    }

    @Override // pinorobotics.rtpstalk.impl.topics.AbstractTopicManager
    protected Topic<PublisherDetails> createTopic(TopicId topicId) {
        return new Topic<>(topicId, this.operatingEntities.getWriters().assignNewEntityId(topicId, EntityKind.WRITER_NO_KEY));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pinorobotics.rtpstalk.impl.topics.AbstractTopicManager
    public ParameterList createAnnouncementData(PublisherDetails publisherDetails, Topic<PublisherDetails> topic) {
        return this.dataFactory.createDiscoveredWriterData(topic.getTopicId(), topic.getLocalTopicEntityId(), this.networkIface.getLocalDefaultUnicastLocator(), publisherDetails.qosPolicy());
    }
}
