package pinorobotics.rtpstalk.impl.spec.behavior.reader;

import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.LongStream;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.impl.behavior.reader.WriterHeartbeatProcessor;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.Locator;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannel;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannelFactory;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/reader/WriterProxy.class */
public class WriterProxy {
    private Guid readerGuid;
    private Guid remoteWriterGuid;
    private List<Locator> unicastLocatorList;
    private XLogger logger;
    private WriterHeartbeatProcessor heartbeatProcessor;
    private DataChannel dataChannel;
    private TracingToken tracingToken;
    private RtpsTalkConfiguration config;
    private final Meter METER = GlobalOpenTelemetry.getMeter(WriterProxy.class.getSimpleName());
    private final LongCounter LOST_CHANGES_COUNT_METER = this.METER.counterBuilder(RtpsTalkMetrics.LOST_CHANGES_COUNT_METRIC).setDescription(RtpsTalkMetrics.LOST_CHANGES_COUNT_METRIC_DESCRIPTION).build();
    private Map<Long, ChangeFromWriterStatusKind> changesFromWriter = new LinkedHashMap();
    private long seqNumMax = 0;

    public WriterProxy(TracingToken tracingToken, RtpsTalkConfiguration rtpsTalkConfiguration, Guid guid, Guid guid2, List<Locator> list) {
        this.config = rtpsTalkConfiguration;
        this.readerGuid = guid;
        this.remoteWriterGuid = guid2;
        this.unicastLocatorList = List.copyOf(list);
        this.tracingToken = tracingToken;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
        this.heartbeatProcessor = new WriterHeartbeatProcessor(tracingToken, this);
    }

    public void receivedChangeSet(long j) {
        if (this.changesFromWriter.get(Long.valueOf(j)) == ChangeFromWriterStatusKind.RECEIVED) {
            this.logger.fine("Change already present in the cache, ignoring...");
            return;
        }
        this.changesFromWriter.put(Long.valueOf(j), ChangeFromWriterStatusKind.RECEIVED);
        this.logger.fine("New change added into the cache");
        if (this.seqNumMax < j) {
            this.logger.fine("Updating maximum sequence number to {0}", new Object[]{Long.valueOf(j)});
            this.seqNumMax = j;
        }
    }

    public Guid getRemoteWriterGuid() {
        return this.remoteWriterGuid;
    }

    public Guid getReaderGuid() {
        return this.readerGuid;
    }

    public long availableChangesMax() {
        return this.seqNumMax;
    }

    public List<Locator> getUnicastLocatorList() {
        return this.unicastLocatorList;
    }

    public void missingChangesUpdate(long j) {
        LongStream.rangeClosed(this.seqNumMax + 1, j).forEach(j2 -> {
            this.changesFromWriter.put(Long.valueOf(j2), ChangeFromWriterStatusKind.MISSING);
        });
    }

    public void lostChangesUpdate(long j) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(this.changesFromWriter.size());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Long, ChangeFromWriterStatusKind> entry : this.changesFromWriter.entrySet()) {
            Long key = entry.getKey();
            if (key.longValue() < j) {
                arrayList.add(key);
            } else {
                linkedHashMap.put(entry.getKey(), entry.getValue());
            }
        }
        this.LOST_CHANGES_COUNT_METER.add(arrayList.size());
        if (!arrayList.isEmpty()) {
            Collections.sort(arrayList);
            this.logger.fine("Changes from {0} to {1} are not available on the Writer anymore and are lost", new Object[]{arrayList.get(0), arrayList.get(arrayList.size() - 1)});
        }
        this.changesFromWriter = linkedHashMap;
    }

    public long[] missingChanges() {
        return this.changesFromWriter.entrySet().stream().filter(entry -> {
            return entry.getValue() == ChangeFromWriterStatusKind.MISSING;
        }).mapToLong((v0) -> {
            return v0.getKey();
        }).toArray();
    }

    public WriterHeartbeatProcessor getHeartbeatProcessor() {
        return this.heartbeatProcessor;
    }

    public DataChannel getDataChannel() {
        if (this.dataChannel == null) {
            DataChannelFactory dataChannelFactory = new DataChannelFactory(this.config);
            Locator locator = getUnicastLocatorList().get(0);
            try {
                this.dataChannel = dataChannelFactory.connect(this.tracingToken, locator);
            } catch (IOException e) {
                throw new RuntimeException("Cannot open connection to remote writer on " + locator, e);
            }
        }
        return this.dataChannel;
    }
}
