package pinorobotics.rtpstalk.impl.spec.behavior.reader;

import id.xfunction.Preconditions;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.impl.RtpsDataPackager;
import pinorobotics.rtpstalk.impl.behavior.reader.DataFragmentReaderProcessor;
import pinorobotics.rtpstalk.impl.behavior.reader.FilterByEntityIdRtpsSubmessageVisitor;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.ReliabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.Data;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.DataFrag;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ParameterList;
import pinorobotics.rtpstalk.impl.spec.messages.walk.Result;
import pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor;
import pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessagesWalker;
import pinorobotics.rtpstalk.impl.spec.structure.RtpsEntity;
import pinorobotics.rtpstalk.impl.spec.structure.history.CacheChange;
import pinorobotics.rtpstalk.impl.spec.structure.history.HistoryCache;
import pinorobotics.rtpstalk.messages.RtpsTalkMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/reader/RtpsReader.class */
public class RtpsReader<D extends RtpsTalkMessage> extends SubmissionPublisher<D> implements RtpsEntity, Flow.Subscriber<RtpsMessage>, RtpsSubmessageVisitor {
    protected final XLogger logger;
    private final Meter METER;
    private final LongHistogram PROCESS_TIME_METER;
    private final LongHistogram DATA_METER;
    private final LongCounter RTPS_READER_COUNT_METER;
    private HistoryCache<D> cache;
    private RtpsSubmessagesWalker walker;
    private Guid guid;
    private RtpsSubmessageVisitor filterVisitor;
    private ReliabilityQosPolicy.Kind reliabilityKind;
    private Optional<Flow.Subscription> subscription;
    private TracingToken tracingToken;
    private RtpsDataPackager<D> packager;
    private Class<D> messageType;
    private DataFragmentReaderProcessor processor;

    public RtpsReader(RtpsTalkConfiguration rtpsTalkConfiguration, TracingToken tracingToken, Class<D> cls, Executor executor, Guid guid, ReliabilityQosPolicy.Kind kind) {
        super(executor, rtpsTalkConfiguration.publisherMaxBufferSize());
        this.METER = GlobalOpenTelemetry.getMeter(RtpsReader.class.getSimpleName());
        this.PROCESS_TIME_METER = this.METER.histogramBuilder(RtpsTalkMetrics.PROCESS_TIME_METRIC).setDescription(RtpsTalkMetrics.PROCESS_TIME_METRIC_DESCRIPTION).ofLongs().build();
        this.DATA_METER = this.METER.histogramBuilder(RtpsTalkMetrics.DATA_METRIC).setDescription(RtpsTalkMetrics.DATA_METRIC_DESCRIPTION).ofLongs().build();
        this.RTPS_READER_COUNT_METER = this.METER.counterBuilder(RtpsTalkMetrics.RTPS_READER_COUNT_METRIC).setDescription(RtpsTalkMetrics.RTPS_READER_COUNT_METRIC_DESCRIPTION).build();
        this.cache = new HistoryCache<>();
        this.walker = new RtpsSubmessagesWalker();
        this.subscription = Optional.empty();
        this.packager = new RtpsDataPackager<>();
        this.messageType = cls;
        this.tracingToken = new TracingToken(tracingToken, new String[]{guid.entityId.toString()});
        this.guid = guid;
        this.reliabilityKind = kind;
        this.processor = new DataFragmentReaderProcessor(this.tracingToken);
        this.filterVisitor = new FilterByEntityIdRtpsSubmessageVisitor(guid.entityId, this);
        this.logger = XLogger.getLogger(getClass(), this.tracingToken);
        this.RTPS_READER_COUNT_METER.add(1L);
    }

    public HistoryCache<D> getReaderCache() {
        return this.cache;
    }

    @Override // pinorobotics.rtpstalk.impl.spec.structure.RtpsEntity
    public Guid getGuid() {
        return this.guid;
    }

    public ReliabilityQosPolicy.Kind getReliabilityKind() {
        return this.reliabilityKind;
    }

    @Override // pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor
    public Result onData(GuidPrefix guidPrefix, Data data) {
        this.DATA_METER.record(1L);
        this.logger.fine("Received data {0}", new Object[]{data});
        Guid guid = new Guid(guidPrefix, data.writerId);
        try {
            this.packager.extractMessage(this.messageType, data).ifPresent(rtpsTalkMessage -> {
                addChange(new CacheChange<>(guid, data.writerSN.value, rtpsTalkMessage));
                data.inlineQos.ifPresent(parameterList -> {
                    processInlineQos(guid, rtpsTalkMessage, parameterList);
                });
            });
        } catch (RtpsDataPackager.MessageTypeMismatchException e) {
            if (Objects.equals(data.readerId, EntityId.Predefined.ENTITYID_UNKNOWN.getValue())) {
                this.logger.warning("Mismatch between message types. Message directed to reader ENTITYID_UNKNOWN but since current reader message type differs, it is ignored: {0}", new Object[]{e.getMessage()});
            } else {
                this.logger.warning("Mismatch between message types, message will be ignored: {0}", new Object[]{e});
            }
        }
        return Result.CONTINUE;
    }

    @Override // pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor
    public Result onDataFrag(GuidPrefix guidPrefix, DataFrag dataFrag) {
        this.DATA_METER.record(1L);
        Guid guid = new Guid(guidPrefix, dataFrag.writerId);
        this.processor.addDataFrag(guid, dataFrag).ifPresent(rtpsTalkDataMessage -> {
            addChange(new CacheChange<>(guid, dataFrag.writerSN.value, rtpsTalkDataMessage));
        });
        return Result.CONTINUE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean addChange(CacheChange<D> cacheChange) {
        this.logger.entering("addChange");
        if (isClosed()) {
            this.logger.fine("Reader is closed, ignoring the change");
            return false;
        }
        boolean addChange = this.cache.addChange(cacheChange);
        if (addChange) {
            this.logger.fine("Submitting new change with sequence number {0} to subscribers", new Object[]{Long.valueOf(cacheChange.getSequenceNumber())});
            submit(cacheChange.getDataValue());
        }
        this.logger.exiting("addChange");
        return addChange;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void process(RtpsMessage rtpsMessage) {
        this.walker.walk(rtpsMessage, this.filterVisitor);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        Preconditions.notNull(subscription, "Already subscribed", new Object[0]);
        this.subscription = Optional.of(subscription);
        subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(RtpsMessage rtpsMessage) {
        Instant now = Instant.now();
        try {
            try {
                process(rtpsMessage);
                this.PROCESS_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
                this.subscription.get().request(1L);
            } catch (Exception e) {
                this.logger.severe(e);
                this.PROCESS_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
                this.subscription.get().request(1L);
            }
        } catch (Throwable th) {
            this.PROCESS_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
            this.subscription.get().request(1L);
            throw th;
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.logger.severe(th);
        throw new UnsupportedOperationException();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
    }

    protected void processInlineQos(Guid guid, D d, ParameterList parameterList) {
        this.logger.fine("Ignoring inlineQos");
    }

    public String toString() {
        return this.tracingToken.toString();
    }

    public TracingToken getTracingToken() {
        return this.tracingToken;
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public void close() {
        this.subscription.ifPresent((v0) -> {
            v0.cancel();
        });
        super.close();
        this.logger.fine("Closed");
    }
}
