package pinorobotics.rtpstalk.impl.topics;

import id.xfunction.Preconditions;
import id.xfunction.concurrent.flow.SimpleSubscriber;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import id.xfunction.util.ImmutableMultiMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import pinorobotics.rtpstalk.exceptions.RtpsTalkException;
import pinorobotics.rtpstalk.impl.RtpsTalkParameterListMessage;
import pinorobotics.rtpstalk.impl.TopicId;
import pinorobotics.rtpstalk.impl.messages.ProtocolParameterMap;
import pinorobotics.rtpstalk.impl.spec.behavior.ParticipantsRegistry;
import pinorobotics.rtpstalk.impl.spec.behavior.writer.StatefullReliableRtpsWriter;
import pinorobotics.rtpstalk.impl.spec.messages.DurabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.ReliabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityKind;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ParameterId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ParameterList;
import pinorobotics.rtpstalk.impl.topics.ActorDetails;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/topics/AbstractTopicManager.class */
public abstract class AbstractTopicManager<A extends ActorDetails> extends SimpleSubscriber<RtpsTalkParameterListMessage> implements AutoCloseable {
    protected XLogger logger;
    protected List<Topic<A>> topics = new CopyOnWriteArrayList();
    protected StatefullReliableRtpsWriter<RtpsTalkParameterListMessage> announcementsWriter;
    private ParticipantsRegistry participantsRegistry;
    private ActorDetails.Type actorsType;

    public AbstractTopicManager(TracingToken tracingToken, StatefullReliableRtpsWriter<RtpsTalkParameterListMessage> statefullReliableRtpsWriter, ParticipantsRegistry participantsRegistry, ActorDetails.Type type) {
        this.announcementsWriter = statefullReliableRtpsWriter;
        this.participantsRegistry = participantsRegistry;
        this.actorsType = type;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
    }

    public EntityId addLocalActor(A a) {
        TopicId topicId = a.topicId();
        this.logger.info("Adding {0} with following details {1}", new Object[]{this.actorsType, a});
        Topic<A> createTopicIfMissing = createTopicIfMissing(topicId);
        if (!createTopicIfMissing.hasLocalActors()) {
            announceTopicInterest(a, createTopicIfMissing);
            createTopicIfMissing.addListener(createListener(createTopicIfMissing));
        }
        createTopicIfMissing.addLocalActor(a);
        return createTopicIfMissing.getLocalTopicEntityId();
    }

    protected abstract Consumer<TopicMatchEvent<A>> createListener(Topic<A> topic);

    private Topic<A> createTopicIfMissing(TopicId topicId) {
        Topic<A> orElse = findTopicById(topicId).orElse(null);
        if (orElse == null) {
            orElse = createTopic(topicId);
            this.topics.add(orElse);
        }
        return orElse;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<Topic<A>> findTopicById(TopicId topicId) {
        return this.topics.stream().filter(topic -> {
            return topic.isMatches(topicId);
        }).findAny();
    }

    protected abstract Topic<A> createTopic(TopicId topicId);

    public void onNext(RtpsTalkParameterListMessage rtpsTalkParameterListMessage) {
        try {
            ProtocolParameterMap protocolParameterMap = (ProtocolParameterMap) rtpsTalkParameterListMessage.parameterList().map((v0) -> {
                return v0.getProtocolParameters();
            }).orElse(null);
            if (protocolParameterMap == null) {
                return;
            }
            if (!isValid(protocolParameterMap)) {
                this.logger.warning("Non valid publications data received, ignoring it");
                this.subscription.request(1L);
                return;
            }
            String str = (String) protocolParameterMap.getFirstParameter(ParameterId.PID_TOPIC_NAME, String.class).orElseThrow(() -> {
                return new RtpsTalkException("Received subscription without PID_TOPIC_NAME");
            });
            String str2 = (String) protocolParameterMap.getFirstParameter(ParameterId.PID_TYPE_NAME, String.class).orElseThrow(() -> {
                return new RtpsTalkException("Received subscription without PID_TYPE_NAME");
            });
            TopicId topicId = new TopicId(str, str2);
            Guid guid = (Guid) protocolParameterMap.getFirstParameter(ParameterId.PID_ENDPOINT_GUID, Guid.class).orElseThrow(() -> {
                return new RtpsTalkException("Received subscription without PID_ENDPOINT_GUID");
            });
            if (!isEndpointSupported(topicId, guid)) {
                this.logger.warning("Endpoint Guid {0} for topic {1} is not supported", new Object[]{guid, topicId});
                this.subscription.request(1L);
                return;
            }
            Guid guid2 = (Guid) findParticipantInfo(guid.guidPrefix, protocolParameterMap, ParameterId.PID_PARTICIPANT_GUID).stream().findFirst().orElse(null);
            if (guid2 == null) {
                this.logger.warning("Could not find participant for endpoint Guid {0}, ignoring subscription for topic {1}", new Object[]{guid, topicId});
                this.subscription.request(1L);
                return;
            }
            List<?> findParticipantInfo = findParticipantInfo(guid.guidPrefix, protocolParameterMap, ParameterId.PID_DEFAULT_UNICAST_LOCATOR);
            if (findParticipantInfo.isEmpty()) {
                this.logger.warning("Could not find locator for endpoint Guid {0}, ignoring subscription for topic {1}", new Object[]{guid, topicId});
                this.subscription.request(1L);
                return;
            }
            Preconditions.equals(guid2.guidPrefix, guid.guidPrefix, "Guid prefix missmatch for topic %s", new Object[]{str});
            RemoteActorDetails remoteActorDetails = new RemoteActorDetails(guid, findParticipantInfo, protocolParameterMap.getReliabilityKind().orElseGet(() -> {
                this.logger.warning("Received subscription without ReliabilityQosPolicy, assuming BEST_EFFORT by default...");
                return ReliabilityQosPolicy.Kind.BEST_EFFORT;
            }), protocolParameterMap.getDurabilityKind().orElseGet(() -> {
                this.logger.warning("Received subscription without DurabilityQosPolicy, assuming VOLATILE_DURABILITY_QOS by default...");
                return DurabilityQosPolicy.Kind.VOLATILE_DURABILITY_QOS;
            }));
            XLogger xLogger = this.logger;
            Object[] objArr = new Object[4];
            objArr[0] = this.actorsType == ActorDetails.Type.Publisher ? ActorDetails.Type.Subscriber : ActorDetails.Type.Publisher;
            objArr[1] = str;
            objArr[2] = str2;
            objArr[3] = remoteActorDetails;
            xLogger.info("Discovered {0} for topic {1} type {2} with following details {3}", objArr);
            createTopicIfMissing(topicId).addRemoteActor(remoteActorDetails);
            this.subscription.request(1L);
        } finally {
            this.subscription.request(1L);
        }
    }

    private boolean isEndpointSupported(TopicId topicId, Guid guid) {
        boolean z = true;
        if (guid.entityId.entityKind() == EntityKind.READER.getValue()) {
            z = false;
        }
        if (guid.entityId.entityKind() == EntityKind.WRITER.getValue()) {
            z = false;
        }
        if (!z) {
            this.logger.warning("Remote participant for topic {0} is using keyed endpoint and will be ignored. Only NOKEY readers and writers are supported. Check that there is no fields in IDL files annotated with @key.", new Object[]{topicId});
        }
        return z;
    }

    private List<?> findParticipantInfo(GuidPrefix guidPrefix, ImmutableMultiMap<ParameterId, Object> immutableMultiMap, ParameterId parameterId) {
        List<?> list = immutableMultiMap.get(parameterId);
        if (!list.isEmpty()) {
            return list;
        }
        this.logger.fine("Received subscription without {0}, trying to find it in Participants Registry by guid prefix {1}", new Object[]{parameterId, guidPrefix});
        ParameterList orElse = this.participantsRegistry.getSpdpDiscoveredParticipantData(guidPrefix).orElse(null);
        if (orElse != null) {
            return orElse.getProtocolParameters().get(parameterId);
        }
        this.logger.fine("Could not find participant with guid prefix {0} in Participants Registry", new Object[]{guidPrefix});
        return List.of();
    }

    protected abstract ParameterList createAnnouncementData(A a, Topic<A> topic);

    /* JADX INFO: Access modifiers changed from: protected */
    public long announceTopicInterest(A a, Topic<A> topic) {
        return this.announcementsWriter.newChange(new RtpsTalkParameterListMessage(createAnnouncementData(a, topic)));
    }

    private boolean isValid(ProtocolParameterMap protocolParameterMap) {
        return protocolParameterMap.containsKey(ParameterId.PID_TOPIC_NAME) && protocolParameterMap.containsKey(ParameterId.PID_TYPE_NAME) && protocolParameterMap.containsKey(ParameterId.PID_ENDPOINT_GUID);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
