package pinorobotics.rtpstalk.impl.spec.behavior.reader;

import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.impl.behavior.reader.WriterHeartbeatProcessor;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.Locator;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannel;
import pinorobotics.rtpstalk.impl.spec.transport.DataChannelFactory;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/reader/WriterProxy.class */
public class WriterProxy {
    private final Guid readerGuid;
    private final Guid remoteWriterGuid;
    private final List<Locator> unicastLocatorList;
    private final XLogger logger;
    private final WriterHeartbeatProcessor heartbeatProcessor;
    private final TracingToken tracingToken;
    private final DataChannelFactory dataChannelFactory;
    private DataChannel dataChannel;
    private final Meter METER = GlobalOpenTelemetry.getMeter(WriterProxy.class.getSimpleName());
    private final LongCounter LOST_CHANGES_COUNT_METER = this.METER.counterBuilder(RtpsTalkMetrics.LOST_CHANGES_COUNT_METRIC).setDescription(RtpsTalkMetrics.LOST_CHANGES_COUNT_METRIC_DESCRIPTION).build();
    private final SortedMap<Long, ChangeFromWriterStatusKind> sortedChangesFromWriter = new ConcurrentSkipListMap();
    private final AtomicLong seqNumMax = new AtomicLong();

    public WriterProxy(TracingToken tracingToken, DataChannelFactory dataChannelFactory, int i, Guid guid, Guid guid2, List<Locator> list) {
        this.dataChannelFactory = dataChannelFactory;
        this.readerGuid = guid;
        this.remoteWriterGuid = guid2;
        this.unicastLocatorList = List.copyOf(list);
        this.tracingToken = tracingToken;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
        this.heartbeatProcessor = new WriterHeartbeatProcessor(tracingToken, this, i);
    }

    public void receivedChangeSet(long j) {
        if (this.sortedChangesFromWriter.get(Long.valueOf(j)) == ChangeFromWriterStatusKind.RECEIVED) {
            this.logger.fine("Change with sequence number {0} already present in the cache, ignoring...", new Object[]{Long.valueOf(j)});
            return;
        }
        this.sortedChangesFromWriter.put(Long.valueOf(j), ChangeFromWriterStatusKind.RECEIVED);
        this.logger.fine("New change added into the cache");
        if (this.seqNumMax.get() < j) {
            this.logger.fine("Updating maximum sequence number to {0}", new Object[]{Long.valueOf(j)});
            this.seqNumMax.set(j);
        }
    }

    public Guid getRemoteWriterGuid() {
        return this.remoteWriterGuid;
    }

    public Guid getReaderGuid() {
        return this.readerGuid;
    }

    public long availableChangesMax() {
        return this.seqNumMax.get();
    }

    public List<Locator> getUnicastLocatorList() {
        return this.unicastLocatorList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v6, types: [long, java.util.Map, java.util.HashMap] */
    public void missingChangesUpdate(long j, long j2) {
        Iterator<Map.Entry<Long, ChangeFromWriterStatusKind>> it = this.sortedChangesFromWriter.tailMap(Long.valueOf(j)).entrySet().iterator();
        long j3 = j;
        ?? hashMap = new HashMap();
        while (it.hasNext() && j3 <= j2) {
            Map.Entry<Long, ChangeFromWriterStatusKind> next = it.next();
            if (next.getKey().longValue() >= j3) {
                while (j3 < next.getKey().longValue()) {
                    j3++;
                    hashMap.put(Long.valueOf((long) hashMap), ChangeFromWriterStatusKind.MISSING);
                }
                if (next.getValue() == ChangeFromWriterStatusKind.RECEIVED) {
                    j3++;
                } else {
                    j3++;
                    hashMap.put(Long.valueOf((long) hashMap), ChangeFromWriterStatusKind.MISSING);
                }
            }
        }
        while (j3 <= j2) {
            j3++;
            hashMap.put(Long.valueOf((long) hashMap), ChangeFromWriterStatusKind.MISSING);
        }
        this.sortedChangesFromWriter.putAll(hashMap);
    }

    public void lostChangesUpdate(long j) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, ChangeFromWriterStatusKind>> it = this.sortedChangesFromWriter.entrySet().iterator();
        while (it.hasNext()) {
            Long key = it.next().getKey();
            if (key.longValue() >= j) {
                break;
            }
            arrayList.add(key);
            it.remove();
        }
        this.LOST_CHANGES_COUNT_METER.add(arrayList.size());
        if (arrayList.isEmpty()) {
            return;
        }
        this.logger.fine("Changes from {0} to {1} are not available on the Writer anymore and are lost", new Object[]{arrayList.get(0), arrayList.get(arrayList.size() - 1)});
    }

    public long[] missingChangesSorted() {
        return this.sortedChangesFromWriter.entrySet().stream().filter(entry -> {
            return entry.getValue() == ChangeFromWriterStatusKind.MISSING;
        }).mapToLong((v0) -> {
            return v0.getKey();
        }).toArray();
    }

    public WriterHeartbeatProcessor getHeartbeatProcessor() {
        return this.heartbeatProcessor;
    }

    public DataChannel getDataChannel() {
        if (this.dataChannel == null) {
            Locator locator = getUnicastLocatorList().get(0);
            try {
                this.dataChannel = this.dataChannelFactory.connect(this.tracingToken, locator);
            } catch (IOException e) {
                throw new RuntimeException("Cannot open connection to remote writer on " + locator, e);
            }
        }
        return this.dataChannel;
    }
}
