package pinorobotics.rtpstalk.impl.spec.behavior.writer;

import id.xfunction.concurrent.flow.SimpleSubscriber;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import id.xfunction.util.IntBitSet;
import java.util.Optional;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.AckNack;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.SequenceNumberSet;
import pinorobotics.rtpstalk.impl.spec.messages.walk.Result;
import pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor;
import pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessagesWalker;
import pinorobotics.rtpstalk.messages.RtpsTalkMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/writer/WriterRtpsReader.class */
public class WriterRtpsReader<D extends RtpsTalkMessage> extends SimpleSubscriber<RtpsMessage> implements RtpsSubmessageVisitor {
    private final XLogger logger;
    private RtpsSubmessagesWalker walker = new RtpsSubmessagesWalker();
    private StatefullReliableRtpsWriter<D> writer;

    public WriterRtpsReader(TracingToken tracingToken, StatefullReliableRtpsWriter<D> statefullReliableRtpsWriter) {
        this.writer = statefullReliableRtpsWriter;
        this.logger = XLogger.getLogger(getClass(), tracingToken);
    }

    @Override // pinorobotics.rtpstalk.impl.spec.messages.walk.RtpsSubmessageVisitor
    public Result onAckNack(GuidPrefix guidPrefix, AckNack ackNack) {
        Guid guid = new Guid(guidPrefix, ackNack.readerId);
        Optional<ReaderProxy> matchedReaderLookup = this.writer.matchedReaderLookup(guid);
        if (!matchedReaderLookup.isEmpty()) {
            this.logger.fine("Processing acknack for writer {0} received from reader {1}", new Object[]{ackNack.writerId, guid});
            ReaderProxy readerProxy = matchedReaderLookup.get();
            SequenceNumberSet sequenceNumberSet = ackNack.readerSNState;
            long j = sequenceNumberSet.bitmapBase.value;
            IntBitSet intBitSet = new IntBitSet(sequenceNumberSet.bitmap);
            readerProxy.ackedChanges(sequenceNumberSet.bitmapBase.value - 1);
            readerProxy.requestedChangesClear();
            int nextSetBit = intBitSet.nextSetBit(0);
            while (true) {
                int i = nextSetBit;
                if (i < 0) {
                    break;
                }
                readerProxy.requestChange(j + i);
                nextSetBit = intBitSet.nextSetBit(i + 1);
            }
        } else {
            this.logger.fine("No matched reader {0} for writer {1}, ignoring it...", new Object[]{guid, ackNack.writerId});
        }
        return Result.CONTINUE;
    }

    public void onNext(RtpsMessage rtpsMessage) {
        try {
            this.walker.walk(rtpsMessage, this);
        } catch (Exception e) {
            this.logger.severe(e);
        } finally {
            this.subscription.request(1L);
        }
    }
}
