package pinorobotics.rtpstalk.impl.spec.behavior.reader;

import id.xfunction.logging.TracingToken;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import pinorobotics.rtpstalk.RtpsTalkConfiguration;
import pinorobotics.rtpstalk.impl.spec.RtpsSpecReference;
import pinorobotics.rtpstalk.impl.spec.behavior.OperatingEntities;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.Locator;
import pinorobotics.rtpstalk.impl.spec.messages.ReliabilityQosPolicy;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.Heartbeat;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ProtocolVersion;
import pinorobotics.rtpstalk.impl.spec.messages.walk.Result;
import pinorobotics.rtpstalk.impl.spec.structure.history.CacheChange;
import pinorobotics.rtpstalk.messages.RtpsTalkMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/reader/StatefullReliableRtpsReader.class */
public class StatefullReliableRtpsReader<D extends RtpsTalkMessage> extends RtpsReader<D> {
    private Map<Guid, WriterProxy> matchedWriters;
    private RtpsTalkConfiguration config;
    private OperatingEntities operatingEntities;

    public StatefullReliableRtpsReader(RtpsTalkConfiguration rtpsTalkConfiguration, TracingToken tracingToken, Class<D> cls, Executor executor, OperatingEntities operatingEntities, EntityId entityId) {
        super(rtpsTalkConfiguration, tracingToken, cls, executor, new Guid(rtpsTalkConfiguration.guidPrefix(), entityId), ReliabilityQosPolicy.Kind.RELIABLE);
        this.matchedWriters = new ConcurrentHashMap();
        this.config = rtpsTalkConfiguration;
        this.operatingEntities = operatingEntities;
        operatingEntities.getReaders().add(this);
    }

    public void matchedWriterAdd(Guid guid, List<Locator> list) {
        WriterProxy writerProxy = new WriterProxy(getTracingToken(), this.config, getGuid(), guid, list);
        this.logger.fine("Adding writer proxy for writer with guid {0}", new Object[]{writerProxy.getRemoteWriterGuid()});
        this.matchedWriters.put(writerProxy.getRemoteWriterGuid(), writerProxy);
    }

    public void matchedWriterRemove(Guid guid) {
        if (this.matchedWriters.remove(guid) == null) {
            this.logger.warning("Trying to remove unknwon matched writer {0}, ignoring...", new Object[]{guid});
        } else {
            this.logger.warning("Matched writer {0} is removed", new Object[]{guid});
        }
    }

    @Override // pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor
    @RtpsSpecReference(paragraph = "8.3.7.5.5", protocolVersion = ProtocolVersion.Predefined.Version_2_3, text = "However, if the FinalFlag is not set, then the Reader must send an AckNack message")
    public Result onHeartbeat(GuidPrefix guidPrefix, Heartbeat heartbeat) {
        if (!heartbeat.isFinal()) {
            Guid guid = new Guid(guidPrefix, heartbeat.writerId);
            WriterProxy writerProxy = this.matchedWriters.get(guid);
            if (writerProxy != null) {
                this.logger.fine("Received heartbeat from writer {0}", new Object[]{guid});
                writerProxy.getHeartbeatProcessor().addHeartbeat(heartbeat);
            } else {
                this.logger.fine("Received heartbeat from unknown writer {0}, ignoring...", new Object[]{guid});
            }
        }
        return super.onHeartbeat(guidPrefix, heartbeat);
    }

    @Override // pinorobotics.rtpstalk.impl.spec.behavior.reader.RtpsReader
    protected boolean addChange(CacheChange<D> cacheChange) {
        if (!super.addChange(cacheChange)) {
            return false;
        }
        WriterProxy writerProxy = this.matchedWriters.get(cacheChange.getWriterGuid());
        if (writerProxy != null) {
            writerProxy.receivedChangeSet(cacheChange.getSequenceNumber());
            return true;
        }
        this.logger.fine("No matched writer with guid {0} found for a new change, ignoring...", new Object[]{cacheChange.getWriterGuid()});
        return true;
    }

    @Override // pinorobotics.rtpstalk.impl.spec.behavior.reader.RtpsReader
    protected void process(RtpsMessage rtpsMessage) {
        super.process(rtpsMessage);
        this.matchedWriters.values().stream().map((v0) -> {
            return v0.getHeartbeatProcessor();
        }).forEach((v0) -> {
            v0.ack();
        });
    }

    protected OperatingEntities getOperatingEntities() {
        return this.operatingEntities;
    }
}
