package pinorobotics.rtpstalk.impl.spec.transport;

import id.xfunction.concurrent.flow.SimpleSubscriber;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.util.List;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/transport/RtpsMessageSender.class */
public class RtpsMessageSender extends SimpleSubscriber<MessageBuilder> implements AutoCloseable {
    private final XLogger logger;
    private DataChannel dataChannel;
    private Guid remoteReader;
    private EntityId writerEntityId;

    /* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/transport/RtpsMessageSender$MessageBuilder.class */
    public interface MessageBuilder {
        default GuidPrefix getReaderGuidPrefix() {
            return GuidPrefix.Predefined.GUIDPREFIX_UNKNOWN.getValue();
        }

        List<RtpsMessage> build(EntityId entityId, EntityId entityId2);
    }

    public RtpsMessageSender(TracingToken tracingToken, DataChannel dataChannel, Guid guid, EntityId entityId) {
        this.dataChannel = dataChannel;
        this.remoteReader = guid;
        this.writerEntityId = entityId;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
    }

    public void onNext(MessageBuilder messageBuilder) {
        this.logger.entering("onNext");
        try {
            send(messageBuilder);
            this.logger.exiting("onNext");
        } finally {
            this.subscription.request(1L);
        }
    }

    private void send(MessageBuilder messageBuilder) {
        try {
            GuidPrefix readerGuidPrefix = messageBuilder.getReaderGuidPrefix();
            if (readerGuidPrefix == GuidPrefix.Predefined.GUIDPREFIX_UNKNOWN.getValue() || readerGuidPrefix.equals(this.remoteReader.guidPrefix)) {
                messageBuilder.build(this.remoteReader.entityId, this.writerEntityId).forEach(this::send);
            } else {
                this.logger.fine("Not sending message since it belongs to different participant {0}", new Object[]{readerGuidPrefix});
            }
        } catch (Exception e) {
            this.logger.severe(e);
        }
    }

    public void onComplete() {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
        this.dataChannel.close();
        this.logger.fine("Closed");
    }

    private void send(RtpsMessage rtpsMessage) {
        this.dataChannel.send(this.remoteReader, rtpsMessage);
    }

    public void replay(MessageBuilder messageBuilder) {
        send(messageBuilder);
    }
}
