package id.jros1client.ros.transport;

import id.ICE.MessageResponse;
import id.jros1client.ros.transport.io.MessagePacketWriter;
import id.jros1messages.MessageSerializationUtils;
import id.jrosclient.utils.TextUtils;
import id.jrosmessages.Message;
import id.jrosmessages.MessageMetadataAccessor;
import id.xfunction.Preconditions;
import id.xfunction.io.XOutputStream;
import id.xfunction.lang.XRE;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;

/* loaded from: input_file:id/jros1client/ros/transport/TopicPublisherSubscriber.class */
public class TopicPublisherSubscriber implements Flow.Subscriber<Message> {
    private XLogger logger;
    private TextUtils utils;
    private MessageMetadataAccessor metadataAccessor = new MessageMetadataAccessor();
    private MessageSerializationUtils serializationUtils = new MessageSerializationUtils();
    private CompletableFuture<MessageResponse> future = CompletableFuture.completedFuture(null);
    private CompletableFuture<Flow.Subscription> subscriptionFuture = new CompletableFuture<>();
    private boolean isConnectionEstablished;
    private boolean isCompleted;
    private String topic;
    private String callerId;
    private Class<? extends Message> messageClass;

    public TopicPublisherSubscriber(TracingToken tracingToken, String str, String str2, Class<? extends Message> cls, TextUtils textUtils) {
        this.logger = XLogger.getLogger(this);
        this.callerId = str;
        this.topic = str2;
        this.messageClass = cls;
        this.utils = textUtils;
        this.logger = XLogger.getLogger(TopicPublisherSubscriber.class, new TracingToken(tracingToken, new String[]{hashCode()}));
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscriptionFuture.complete(subscription);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(Message message) {
        this.logger.entering("onNext");
        this.logger.fine("Published new message: {0}", new Object[]{this.utils.toString(message)});
        Preconditions.isTrue(message.getClass() == this.messageClass, "Incompatible message type");
        sendPacket(createMessagePacket(message));
        this.logger.exiting("onNext");
    }

    public CompletableFuture<MessageResponse> request() {
        Preconditions.isTrue(!this.isCompleted, "Fail to request new message since subscriber has completed");
        if (!this.future.isDone()) {
            return this.future;
        }
        this.future = new CompletableFuture<>();
        try {
            Flow.Subscription subscription = this.subscriptionFuture.get();
            if (this.isConnectionEstablished) {
                subscription.request(1L);
            } else {
                this.logger.fine("This is a new connection - creating handshake packet first");
                sendPacket(createHandshakeMessagePacket());
                this.isConnectionEstablished = true;
            }
            return this.future;
        } catch (InterruptedException | ExecutionException e) {
            throw new XRE("Never was subscribed to the TopicPublisher");
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        this.logger.entering("onError");
        this.logger.severe("Error: {0}", new Object[]{th.getMessage()});
        try {
            this.subscriptionFuture.get().cancel();
        } catch (InterruptedException | ExecutionException e) {
            th.addSuppressed(e);
        }
        this.isCompleted = true;
        this.future.complete(null);
        this.logger.exiting("onError");
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.logger.entering("onComplete");
        this.isCompleted = true;
        this.logger.exiting("onComplete");
    }

    public boolean isCompleted() {
        return this.isCompleted;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getCallerId() {
        return this.callerId;
    }

    private void sendPacket(MessagePacket messagePacket) {
        this.logger.entering("sendPacket");
        XOutputStream xOutputStream = new XOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(xOutputStream));
        try {
            new MessagePacketWriter(dataOutputStream).write(messagePacket);
            dataOutputStream.flush();
            this.logger.fine("Sending packet to subscriber");
            this.logger.fine(this.utils.toString(xOutputStream.asHexString()));
            this.future.complete(new MessageResponse(ByteBuffer.wrap(xOutputStream.toByteArray())).withIgnoreNextRequest().withErrorHandler(this::onError));
            this.logger.exiting("sendPacket");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private MessagePacket createHandshakeMessagePacket() {
        ConnectionHeader connectionHeader = new ConnectionHeader();
        connectionHeader.withType(this.metadataAccessor.getName(this.messageClass)).withMd5Sum(this.metadataAccessor.getMd5(this.messageClass));
        return new MessagePacket(connectionHeader, null);
    }

    private MessagePacket createMessagePacket(Message message) {
        return new MessagePacket(new ConnectionHeader(), this.serializationUtils.write(message));
    }
}
