package pinorobotics.rtpstalk.impl.spec.behavior.writer;

import id.xfunction.Preconditions;
import id.xfunction.concurrent.NamedThreadFactory;
import id.xfunction.lang.XThread;
import id.xfunction.logging.TracingToken;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.WriterSettings;
import pinorobotics.rtpstalk.impl.RtpsTalkConfigurationInternal;
import pinorobotics.rtpstalk.impl.behavior.writer.RtpsDataMessageBuilder;
import pinorobotics.rtpstalk.impl.behavior.writer.RtpsHeartbeatMessageBuilder;
import pinorobotics.rtpstalk.impl.qos.ReaderQosPolicySet;
import pinorobotics.rtpstalk.impl.qos.WriterQosPolicySet;
import pinorobotics.rtpstalk.impl.spec.RtpsSpecReference;
import pinorobotics.rtpstalk.impl.spec.behavior.LocalOperatingEntities;
import pinorobotics.rtpstalk.impl.spec.messages.DurabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.Locator;
import pinorobotics.rtpstalk.impl.spec.messages.ReliabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.RepresentationIdentifier;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ProtocolVersion;
import pinorobotics.rtpstalk.impl.spec.structure.history.CacheChange;
import pinorobotics.rtpstalk.impl.spec.structure.history.HistoryCache;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannelFactory;
import pinorobotics.rtpstalk.impl.spec.transport.RtpsMessageSender;
import pinorobotics.rtpstalk.messages.RtpsTalkMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/writer/StatefullReliableRtpsWriter.class */
public class StatefullReliableRtpsWriter<D extends RtpsTalkMessage> extends RtpsWriter<D> implements Runnable, AutoCloseable {
    private final Meter METER;
    private final LongHistogram HEARTBEATS_METER;
    private ScheduledExecutorService executor;
    private Map<Guid, ReaderProxy> matchedReaders;
    private Duration heartbeatPeriod;
    private HistoryCache<D> historyCache;
    private int heartbeatCount;
    private DataChannelFactory channelFactory;
    private LocalOperatingEntities operatingEntities;
    private int historyCacheMaxSize;
    private WriterRtpsReader<D> writerReader;
    private WriterQosPolicySet qosPolicy;
    private WriterSettings writerSettings;
    private boolean isClosed;

    /* renamed from: pinorobotics.rtpstalk.impl.spec.behavior.writer.StatefullReliableRtpsWriter$1, reason: invalid class name */
    /* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/writer/StatefullReliableRtpsWriter$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$pinorobotics$rtpstalk$impl$spec$messages$ReliabilityQosPolicy$Kind = new int[ReliabilityQosPolicy.Kind.values().length];

        static {
            try {
                $SwitchMap$pinorobotics$rtpstalk$impl$spec$messages$ReliabilityQosPolicy$Kind[ReliabilityQosPolicy.Kind.RELIABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$pinorobotics$rtpstalk$impl$spec$messages$ReliabilityQosPolicy$Kind[ReliabilityQosPolicy.Kind.BEST_EFFORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public StatefullReliableRtpsWriter(RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, TracingToken tracingToken, Executor executor, DataChannelFactory dataChannelFactory, LocalOperatingEntities localOperatingEntities, EntityId entityId, WriterQosPolicySet writerQosPolicySet, WriterSettings writerSettings) {
        super(rtpsTalkConfigurationInternal, tracingToken, executor, entityId);
        this.METER = GlobalOpenTelemetry.getMeter(StatefullReliableRtpsWriter.class.getSimpleName());
        this.HEARTBEATS_METER = this.METER.histogramBuilder(RtpsTalkMetrics.HEARTBEATS_METRIC).setDescription(RtpsTalkMetrics.HEARTBEATS_METRIC_DESCRIPTION).ofLongs().build();
        this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("SpdpBuiltinParticipantWriter"));
        this.matchedReaders = new ConcurrentHashMap();
        this.heartbeatCount = 1;
        this.writerSettings = writerSettings;
        Preconditions.equals(writerQosPolicySet.reliabilityKind(), ReliabilityQosPolicy.Kind.RELIABLE);
        this.channelFactory = dataChannelFactory;
        this.operatingEntities = localOperatingEntities;
        this.heartbeatPeriod = rtpsTalkConfigurationInternal.publicConfig().heartbeatPeriod();
        this.historyCacheMaxSize = rtpsTalkConfigurationInternal.publicConfig().historyCacheMaxSize();
        this.historyCache = new HistoryCache<>(getTracingToken());
        localOperatingEntities.getLocalWriters().add(this);
        this.writerReader = new WriterRtpsReader<>(getTracingToken(), this);
        this.qosPolicy = writerQosPolicySet;
    }

    public HistoryCache<D> getWriterCache() {
        return this.historyCache;
    }

    @Override // pinorobotics.rtpstalk.impl.spec.behavior.writer.RtpsWriter
    public long newChange(D d) {
        long newChange = super.newChange(d);
        this.logger.fine("New change submitted");
        this.historyCache.addChange(new CacheChange<>(getGuid(), getLastChangeNumber(), d));
        return newChange;
    }

    public synchronized void matchedReaderAdd(Guid guid, List<Locator> list, ReaderQosPolicySet readerQosPolicySet) throws IOException {
        ReaderProxy bestEffortReaderProxy;
        if (this.matchedReaders.containsKey(guid)) {
            this.logger.fine("Reader {0} is already registered with the writer, not adding it", new Object[]{guid});
            return;
        }
        RtpsMessageSender rtpsMessageSender = new RtpsMessageSender(getTracingToken(), this.channelFactory.connect(getTracingToken(), list.get(0)), guid, getGuid().entityId);
        ReliabilityQosPolicy.Kind reliabilityKind = readerQosPolicySet.reliabilityKind();
        switch (AnonymousClass1.$SwitchMap$pinorobotics$rtpstalk$impl$spec$messages$ReliabilityQosPolicy$Kind[reliabilityKind.ordinal()]) {
            case RtpsTalkConfiguration.ENDIANESS_BIT /* 1 */:
                bestEffortReaderProxy = new ReliableReaderProxy(guid, list, rtpsMessageSender, readerQosPolicySet);
                break;
            case RepresentationIdentifier.SIZE /* 2 */:
                bestEffortReaderProxy = new BestEffortReaderProxy(guid, list, rtpsMessageSender, readerQosPolicySet);
                break;
            default:
                throw new UnsupportedOperationException("ReliabilityQosPolicy " + reliabilityKind);
        }
        ReaderProxy readerProxy = bestEffortReaderProxy;
        replayHistoryCacheIfNeeded(readerProxy);
        this.logger.fine("Adding reader proxy for reader with guid {0} and reliability {1}", new Object[]{readerProxy.getRemoteReaderGuid(), reliabilityKind});
        int size = this.matchedReaders.size();
        this.matchedReaders.put(readerProxy.getRemoteReaderGuid(), readerProxy);
        subscribe(readerProxy.getSender());
        if (size == 0) {
            this.executor.scheduleWithFixedDelay(this, 0L, this.heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void replayHistoryCacheIfNeeded(ReaderProxy readerProxy) {
        if (this.qosPolicy.durabilityKind() == DurabilityQosPolicy.Kind.TRANSIENT_LOCAL_DURABILITY_QOS && readerProxy.getQosPolicy().reliabilityKind() == ReliabilityQosPolicy.Kind.BEST_EFFORT) {
            this.logger.fine("Replaying history changes for Reader {0}", new Object[]{readerProxy.getRemoteReaderGuid()});
            RtpsDataMessageBuilder rtpsDataMessageBuilder = new RtpsDataMessageBuilder(getConfig(), getTracingToken(), getGuid().guidPrefix, readerProxy.getRemoteReaderGuid().guidPrefix);
            this.historyCache.getAllSortedBySeqNum(getGuid()).forEach(cacheChange -> {
                rtpsDataMessageBuilder.add(cacheChange.getSequenceNumber(), cacheChange.getDataValue());
            });
            readerProxy.getSender().replay((RtpsMessageSender.MessageBuilder) rtpsDataMessageBuilder);
        }
    }

    public boolean matchedReaderRemove(Guid guid) {
        ReaderProxy remove = this.matchedReaders.remove(guid);
        if (remove == null) {
            this.logger.fine("Trying to remove unknown matched reader {0}, ignoring...", new Object[]{guid});
            return false;
        }
        remove.close();
        cleanupCache();
        this.logger.fine("Matched reader {0} is removed", new Object[]{guid});
        return true;
    }

    public void matchedReadersRemove(GuidPrefix guidPrefix) {
        this.logger.fine("Removing all matched readers with guidPrefix {0}", new Object[]{guidPrefix});
        this.logger.fine("Removed {0} matched readers with guidPrefix {1}", new Object[]{Long.valueOf(this.matchedReaders.keySet().stream().filter(guid -> {
            return guid.guidPrefix.equals(guidPrefix);
        }).filter(this::matchedReaderRemove).count()), guidPrefix});
    }

    public Optional<ReaderProxy> matchedReaderLookup(Guid guid) {
        return Optional.ofNullable(this.matchedReaders.get(guid));
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.executor.isShutdown()) {
                return;
            }
            sendRequested();
            sendHeartbeats();
            cleanupCache();
        } catch (Exception e) {
            this.logger.severe("Writer heartbeat error", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanupCache() {
        if (this.qosPolicy.durabilityKind() != DurabilityQosPolicy.Kind.TRANSIENT_LOCAL_DURABILITY_QOS || this.isClosed) {
            if (this.matchedReaders.isEmpty()) {
                this.logger.fine("Cleaning up all changes since there is no matched readers available");
                this.historyCache.removeAllBelow(getLastChangeNumber());
            } else {
                long orElse = this.matchedReaders.values().stream().mapToLong((v0) -> {
                    return v0.getHighestAckedSeqNum();
                }).min().orElse(0L);
                if (orElse == 0) {
                    return;
                }
                if (orElse == Long.MAX_VALUE) {
                    orElse = getLastChangeNumber();
                }
                this.logger.fine("Cleaning up all changes up to and including {0} since they are acknowledged by all the readers", new Object[]{Long.valueOf(orElse)});
                this.historyCache.removeAllBelow(orElse + 1);
            }
            request();
        }
    }

    @Override // pinorobotics.rtpstalk.impl.spec.behavior.writer.RtpsWriter, java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.logger.fine("Closing");
        cancelSubscription();
        this.isClosed = true;
        int numberOfChanges = this.historyCache.getNumberOfChanges(getGuid());
        while (true) {
            int i = numberOfChanges;
            if (i == 0) {
                break;
            }
            if (this.matchedReaders.isEmpty()) {
                this.logger.fine("Discarding pending changes since there is no matched readers available");
                break;
            } else {
                this.logger.fine("Waiting for {0} pending changes [{1}..{2}] in the history cache to be sent", new Object[]{Integer.valueOf(i), Long.valueOf(this.historyCache.getSeqNumMin(getGuid())), Long.valueOf(this.historyCache.getSeqNumMax(getGuid()))});
                XThread.sleep(this.heartbeatPeriod.toMillis());
                numberOfChanges = this.historyCache.getNumberOfChanges(getGuid());
            }
        }
        super.close();
        this.operatingEntities.getLocalWriters().remove(getGuid().entityId);
        this.executor.shutdown();
        this.writerReader.getSubscription().ifPresent((v0) -> {
            v0.cancel();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pinorobotics.rtpstalk.impl.spec.behavior.writer.RtpsWriter
    public void request() {
        if (this.historyCache.getNumberOfChanges(getGuid()) < this.historyCacheMaxSize) {
            super.request();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pinorobotics.rtpstalk.impl.spec.behavior.writer.RtpsWriter
    @RtpsSpecReference(paragraph = "8.4.2.2", protocolVersion = ProtocolVersion.Predefined.Version_2_3, text = "Writers must not send data out-of-order")
    public void sendLastChangeToAllReaders() {
        if (this.writerSettings.pushMode()) {
            super.sendLastChangeToAllReaders();
        }
        this.matchedReaders.values().stream().filter(ReaderProxy.isBestEffort()).map(readerProxy -> {
            return new RtpsDataMessageBuilder(getConfig(), getTracingToken(), getGuid().guidPrefix, readerProxy.getRemoteReaderGuid().guidPrefix).addAll(getLastMessage());
        }).forEach((v1) -> {
            submit(v1);
        });
    }

    private void sendHeartbeats() {
        long seqNumMin = this.historyCache.getSeqNumMin(getGuid());
        if (seqNumMin <= 0) {
            this.logger.fine("Skipping heartbeat since history cache is empty");
            return;
        }
        long seqNumMax = this.historyCache.getSeqNumMax(getGuid());
        Preconditions.isLess(0L, seqNumMax, "Negative sequence number");
        Collection<ReaderProxy> values = this.matchedReaders.values();
        if (values.isEmpty()) {
            this.logger.fine("Skipping heartbeat since there is no readers available");
            return;
        }
        values.stream().filter(ReaderProxy.isReliable()).map(readerProxy -> {
            return readerProxy.getRemoteReaderGuid().guidPrefix;
        }).map(guidPrefix -> {
            return new RtpsHeartbeatMessageBuilder(getGuid().guidPrefix, guidPrefix, seqNumMin, seqNumMax, this.heartbeatCount);
        }).forEach((v1) -> {
            submit(v1);
        });
        this.logger.fine("Heartbeat {0} submitted to {1} readers", new Object[]{Integer.valueOf(this.heartbeatCount), Integer.valueOf(values.size())});
        this.heartbeatCount++;
        this.HEARTBEATS_METER.record(1L);
    }

    private void sendRequested() {
        this.matchedReaders.values().stream().forEach(this::sendRequested);
    }

    private void sendRequested(ReaderProxy readerProxy) {
        List<Long> requestedChanges = readerProxy.requestedChanges();
        Guid remoteReaderGuid = readerProxy.getRemoteReaderGuid();
        if (requestedChanges.isEmpty()) {
            this.logger.fine("Nothing to submit for reader {0} as it did not request any changes, ignoring", new Object[]{remoteReaderGuid});
            return;
        }
        RtpsDataMessageBuilder rtpsDataMessageBuilder = new RtpsDataMessageBuilder(getConfig(), getTracingToken(), getGuid().guidPrefix, remoteReaderGuid.guidPrefix);
        this.historyCache.findAll(getGuid(), requestedChanges).forEach(cacheChange -> {
            rtpsDataMessageBuilder.add(cacheChange.getSequenceNumber(), cacheChange.getDataValue());
        });
        int dataCount = rtpsDataMessageBuilder.getDataCount();
        if (dataCount == 0) {
            this.logger.fine("No requested changes were found for reader {0}", new Object[]{remoteReaderGuid});
        } else {
            submit(rtpsDataMessageBuilder);
            this.logger.fine("Submitted {0} out of {1} requested changes to reader {2}", new Object[]{Integer.valueOf(dataCount), Integer.valueOf(requestedChanges.size()), remoteReaderGuid});
        }
    }

    public Flow.Subscriber<RtpsMessage> getWriterReader() {
        return this.writerReader;
    }
}
