package pinorobotics.rtpstalk.impl.spec.transport;

import id.xfunction.Preconditions;
import id.xfunction.function.Unchecked;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import pinorobotics.rtpstalk.RtpsTalkMetrics;
import pinorobotics.rtpstalk.impl.spec.messages.Guid;
import pinorobotics.rtpstalk.impl.spec.messages.RtpsMessage;
import pinorobotics.rtpstalk.impl.spec.messages.submessages.elements.GuidPrefix;
import pinorobotics.rtpstalk.impl.spec.transport.io.RtpsMessageReader;
import pinorobotics.rtpstalk.impl.spec.transport.io.RtpsMessageWriter;

/* loaded from: input_file:pinorobotics/rtpstalk/impl/spec/transport/DataChannel.class */
public class DataChannel implements AutoCloseable {
    private final Meter METER = GlobalOpenTelemetry.getMeter(RtpsMessageWriter.class.getSimpleName());
    private final LongHistogram SEND_TIME_METER = this.METER.histogramBuilder(RtpsTalkMetrics.SEND_TIME_METRIC).setDescription(RtpsTalkMetrics.SEND_TIME_METRIC_DESCRIPTION).ofLongs().build();
    private final LongHistogram RECEIVE_TIME_METER = this.METER.histogramBuilder(RtpsTalkMetrics.RECEIVE_TIME_METRIC).setDescription(RtpsTalkMetrics.RECEIVE_TIME_METRIC_DESCRIPTION).ofLongs().build();
    private RtpsMessageReader reader = new RtpsMessageReader();
    private RtpsMessageWriter writer = new RtpsMessageWriter();
    private DatagramChannel datagramChannel;
    private int packetBufferSize;
    private GuidPrefix guidPrefix;
    private SocketAddress target;
    private XLogger logger;
    private boolean isClosed;

    /* JADX INFO: Access modifiers changed from: protected */
    public DataChannel(TracingToken tracingToken, DatagramChannel datagramChannel, SocketAddress socketAddress, byte[] bArr, int i) {
        this.datagramChannel = datagramChannel;
        this.target = socketAddress;
        this.guidPrefix = new GuidPrefix(bArr);
        this.packetBufferSize = i;
        Preconditions.isTrue(datagramChannel.isBlocking(), "By default blocking DatagramChannel expected, see https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/channels/spi/AbstractSelectableChannel.html#isBlocking()", new Object[0]);
        this.logger = XLogger.getLogger(getClass(), new TracingToken(tracingToken, new String[]{datagramChannel.socket().getLocalPort()}));
    }

    public RtpsMessage receive() throws Exception {
        ByteBuffer allocate = ByteBuffer.allocate(this.packetBufferSize);
        while (true) {
            Instant now = Instant.now();
            allocate.clear();
            try {
                this.datagramChannel.receive(allocate);
                int position = allocate.position();
                allocate.rewind();
                allocate.limit(position);
                this.logger.fine("Received UDP packet of size {0}", new Object[]{Integer.valueOf(position)});
                Optional<RtpsMessage> readRtpsMessage = this.reader.readRtpsMessage(allocate);
                if (!readRtpsMessage.isEmpty()) {
                    RtpsMessage rtpsMessage = readRtpsMessage.get();
                    if (!rtpsMessage.header.guidPrefix.equals(this.guidPrefix)) {
                        return rtpsMessage;
                    }
                    this.logger.fine("Received its own message, ignoring...");
                }
            } finally {
                this.RECEIVE_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
            }
        }
    }

    public void send(Guid guid, RtpsMessage rtpsMessage) {
        this.logger.fine("Outgoing RTPS message for remote reader {0}: {1}", new Object[]{guid, rtpsMessage});
        if (this.isClosed) {
            this.logger.fine("Data channel is already closed, ignoring the message...");
            return;
        }
        ByteBuffer allocate = ByteBuffer.allocate(this.packetBufferSize);
        allocate.rewind();
        allocate.limit(allocate.capacity());
        try {
            this.writer.writeRtpsMessage(rtpsMessage, allocate);
            allocate.limit(allocate.position());
            allocate.rewind();
            Instant now = Instant.now();
            try {
                this.datagramChannel.send(allocate, this.target);
                this.SEND_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
            } catch (Throwable th) {
                this.SEND_TIME_METER.record(Duration.between(now, Instant.now()).toMillis());
                throw th;
            }
        } catch (Throwable th2) {
            this.logger.severe(th2);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        try {
            this.datagramChannel.close();
        } catch (IOException e) {
            this.logger.severe(e);
        }
        this.logger.fine("Closed");
    }

    public int getLocalPort() {
        Preconditions.isTrue(Unchecked.getBoolean(() -> {
            return this.datagramChannel.getLocalAddress() instanceof InetSocketAddress;
        }), "Inet socket required", new Object[0]);
        try {
            return ((InetSocketAddress) this.datagramChannel.getLocalAddress()).getPort();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
