package id.jrosclient;

import id.jrosmessages.Message;
import id.xfunction.XJson;
import id.xfunction.lang.XThread;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;

/* loaded from: input_file:id/jrosclient/TopicSubmissionPublisher.class */
public class TopicSubmissionPublisher<M extends Message> extends SubmissionPublisher<M> implements TopicPublisher<M> {
    private final XLogger LOGGER;
    private final Meter METER;
    private final LongHistogram TOPIC_PUBLISHER_OBJECTS_METER;
    private final LongHistogram TOPIC_PUBLISHER_SUBMITTED_MESSAGE_METER;
    private final LongHistogram TOPIC_PUBLISHER_ERRORS_METER;
    private Class<M> messageClass;
    private String topic;

    public TopicSubmissionPublisher(Class<M> cls, String str) {
        this(cls, str, ForkJoinPool.commonPool(), Flow.defaultBufferSize());
    }

    public TopicSubmissionPublisher(Class<M> cls, String str, Executor executor, int i) {
        super(executor, i);
        this.LOGGER = XLogger.getLogger(this);
        this.METER = GlobalOpenTelemetry.getMeter(TopicSubmissionPublisher.class.getSimpleName());
        this.TOPIC_PUBLISHER_OBJECTS_METER = this.METER.histogramBuilder(JRosClientMetrics.TOPIC_PUBLISHER_OBJECTS_METRIC).setDescription(JRosClientMetrics.TOPIC_PUBLISHER_OBJECTS_METRIC_DESCRIPTION).ofLongs().build();
        this.TOPIC_PUBLISHER_SUBMITTED_MESSAGE_METER = this.METER.histogramBuilder(JRosClientMetrics.TOPIC_PUBLISHER_SUBMITTED_MESSAGES_METRIC).setDescription(JRosClientMetrics.TOPIC_PUBLISHER_SUBMITTED_MESSAGES_METRIC_DESCRIPTION).ofLongs().build();
        this.TOPIC_PUBLISHER_ERRORS_METER = this.METER.histogramBuilder(JRosClientMetrics.TOPIC_PUBLISHER_ERRORS_METRIC).setDescription(JRosClientMetrics.TOPIC_PUBLISHER_ERRORS_METRIC_DESCRIPTION).ofLongs().build();
        this.messageClass = cls;
        this.topic = str;
        this.TOPIC_PUBLISHER_OBJECTS_METER.record(1L);
    }

    @Override // id.jrosclient.TopicPublisher
    public Class<M> getMessageClass() {
        return this.messageClass;
    }

    @Override // id.jrosclient.TopicPublisher
    public String getTopic() {
        return this.topic;
    }

    @Override // java.util.concurrent.SubmissionPublisher
    public int submit(M m) {
        this.TOPIC_PUBLISHER_SUBMITTED_MESSAGE_METER.record(1L);
        return super.submit((TopicSubmissionPublisher<M>) m);
    }

    @Override // java.util.concurrent.SubmissionPublisher, java.lang.AutoCloseable, id.jrosclient.TopicPublisher
    public void close() {
        this.LOGGER.entering("close");
        while (estimateMaximumLag() > 0) {
            this.LOGGER.fine("Some messages are still waiting to be delivered, sleeping...");
            XThread.sleep(100L);
        }
        this.LOGGER.fine("There is no more messages waiting to be delivered, closing the publisher");
        super.close();
        this.LOGGER.exiting("close");
    }

    public String toString() {
        return XJson.asString(new Object[]{"topic", this.topic});
    }

    @Override // id.jrosclient.TopicPublisher
    public void onPublishError(Throwable th) {
        this.LOGGER.severe("Error delivering message to the subscriber", th);
        this.TOPIC_PUBLISHER_ERRORS_METER.record(1L);
    }
}
