package pinorobotics.rtpstalk.impl.spec.behavior.writer;

import id.xfunction.Preconditions;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.impl.RtpsTalkConfigurationInternal;
import pinorobotics.rtpstalk.impl.behavior.writer.RtpsDataMessageBuilder;
import pinorobotics.rtpstalk.impl.spec.RtpsSpecReference;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.EntityId;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.ProtocolVersion;
import pinorobotics.rtpstalk.impl.spec.structure.RtpsEntity;
import pinorobotics.rtpstalk.impl.spec.transport.RtpsMessageSender;
import pinorobotics.rtpstalk.messages.RtpsTalkMessage;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/behavior/writer/RtpsWriter.class */
public abstract class RtpsWriter<D extends RtpsTalkMessage> extends SubmissionPublisher<RtpsMessageSender.MessageBuilder> implements Flow.Subscriber<D>, RtpsEntity, AutoCloseable {
    private final Meter METER;
    private final LongHistogram SUBMITTED_CHANGES_METER;
    protected final XLogger logger;
    private RtpsTalkConfigurationInternal config;
    private long lastChangeNumber;
    private Guid writerGuid;
    private RtpsDataMessageBuilder lastMessage;
    private Optional<Flow.Subscription> subscriptionOpt;
    private TracingToken tracingToken;

    protected RtpsWriter(RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, TracingToken tracingToken, Executor executor, EntityId entityId) {
        this(rtpsTalkConfigurationInternal, tracingToken, executor, entityId, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @RtpsSpecReference(paragraph = "8.4.9.1.1", protocolVersion = ProtocolVersion.Predefined.Version_2_3, text = "Writer always pushes out data as it becomes available")
    public RtpsWriter(RtpsTalkConfigurationInternal rtpsTalkConfigurationInternal, TracingToken tracingToken, Executor executor, EntityId entityId, boolean z) {
        super(executor, rtpsTalkConfigurationInternal.publicConfig().publisherMaxBufferSize());
        this.METER = GlobalOpenTelemetry.getMeter(RtpsWriter.class.getSimpleName());
        this.SUBMITTED_CHANGES_METER = this.METER.histogramBuilder(RtpsTalkMetrics.SUBMITTED_CHANGES_METRIC).setDescription(RtpsTalkMetrics.SUBMITTED_CHANGES_METRIC_DESCRIPTION).ofLongs().build();
        this.subscriptionOpt = Optional.empty();
        this.config = rtpsTalkConfigurationInternal;
        this.tracingToken = new TracingToken(tracingToken, new String[]{entityId.toString()});
        this.writerGuid = new Guid(rtpsTalkConfigurationInternal.publicConfig().guidPrefix(), entityId);
        this.logger = XLogger.getLogger(getClass(), this.tracingToken);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getLastChangeNumber() {
        return this.lastChangeNumber;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RtpsDataMessageBuilder getLastMessage() {
        return this.lastMessage;
    }

    public void repeatLastChange() {
        Preconditions.notNull(this.lastMessage);
        this.logger.entering("repeatLastChange");
        submit(this.lastMessage);
        this.logger.exiting("repeatLastChange");
    }

    public long newChange(D d) {
        this.logger.entering("newChange");
        this.SUBMITTED_CHANGES_METER.record(1L);
        this.lastChangeNumber++;
        this.lastMessage = new RtpsDataMessageBuilder(this.config, this.writerGuid.guidPrefix);
        this.lastMessage.add(this.lastChangeNumber, d);
        sendLastChangeToAllReaders();
        this.logger.exiting("newChange");
        return this.lastChangeNumber;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void request() {
        this.subscriptionOpt.ifPresent(subscription -> {
            this.logger.fine("Requesting next message from the local publisher");
            subscription.request(1L);
        });
    }

    protected void sendLastChangeToAllReaders() {
        submit(this.lastMessage);
    }

    @Override // pinorobotics.rtpstalk.impl.spec.structure.RtpsEntity
    public Guid getGuid() {
        return this.writerGuid;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        Preconditions.isTrue(this.subscriptionOpt.isEmpty(), "Already subscribed");
        this.subscriptionOpt = Optional.of(subscription);
        subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(D d) {
        newChange(d);
        request();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.logger.severe(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.logger.fine("Publisher stopped");
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable
    public void close() {
        cancelSubscription();
        super.close();
        this.logger.fine("Closed");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RtpsTalkConfigurationInternal getConfig() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cancelSubscription() {
        this.subscriptionOpt.ifPresent((v0) -> {
            v0.cancel();
        });
        this.subscriptionOpt = Optional.empty();
    }

    public TracingToken getTracingToken() {
        return this.tracingToken;
    }
}
