package id.jros1client.ros.transport;

import id.ICE.MessageRequest;
import id.ICE.MessageResponse;
import id.ICE.MessageServer;
import id.ICE.MessageService;
import id.jros1client.JRos1ClientConfiguration;
import id.jros1client.ros.transport.io.ConnectionHeaderReader;
import id.jrosclient.TopicPublisher;
import id.jrosclient.utils.TextUtils;
import id.jrosmessages.MessageMetadataAccessor;
import id.xfunction.Preconditions;
import id.xfunction.function.Unchecked;
import id.xfunction.io.ByteBufferInputStream;
import id.xfunction.lang.XRE;
import id.xfunction.logging.TracingToken;
import id.xfunction.logging.XLogger;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;

/* loaded from: input_file:id/jros1client/ros/transport/TcpRosServer.class */
public class TcpRosServer implements MessageService, AutoCloseable {
    private XLogger logger;
    private MessageServer server;
    private PublishersManager publishersManager;
    private TextUtils utils;
    private boolean isStarted;
    private TracingToken tracingToken;
    private MessageMetadataAccessor metadataAccessor = new MessageMetadataAccessor();
    private ConnectionHeaderValidator headerValidator = new ConnectionHeaderValidator(this.metadataAccessor);
    private Map<Integer, TopicPublisherSubscriber> subscribers = new ConcurrentHashMap();
    private Set<Integer> closedConnections = new HashSet();

    public TcpRosServer(TracingToken tracingToken, PublishersManager publishersManager, JRos1ClientConfiguration jRos1ClientConfiguration, TextUtils textUtils) {
        this.tracingToken = tracingToken;
        this.publishersManager = publishersManager;
        this.server = new MessageServer(this, new ConnectionHeaderScanner()).withPort(jRos1ClientConfiguration.getTcpRosServerPort());
        this.utils = textUtils;
        this.logger = XLogger.getLogger(TcpRosServer.class, new TracingToken(tracingToken, new String[]{hashCode()}));
    }

    public void start() throws IOException {
        if (this.isStarted) {
            return;
        }
        this.logger.fine("Starting...");
        this.isStarted = true;
        this.server.run();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        List<TopicPublisher<?>> publishers = this.publishersManager.getPublishers();
        if (!publishers.isEmpty()) {
            throw new XRE("Attempt to close client with active publishers: %s", new Object[]{publishers});
        }
        this.logger.fine("Stopping...");
        this.subscribers.values().forEach((v0) -> {
            v0.onComplete();
        });
        this.subscribers.clear();
        Unchecked.run(() -> {
            this.server.close();
        });
        this.isStarted = false;
    }

    public CompletableFuture<MessageResponse> process(MessageRequest messageRequest) {
        int connectionId = messageRequest.getConnectionId();
        this.logger.entering("process", new Object[]{Integer.valueOf(connectionId)});
        if (this.closedConnections.remove(Integer.valueOf(connectionId))) {
            this.logger.info("Closing connection as there is no more publishers serving it...");
            return CompletableFuture.completedFuture(null);
        }
        TopicPublisherSubscriber topicPublisherSubscriber = this.subscribers.get(Integer.valueOf(connectionId));
        if (topicPublisherSubscriber == null) {
            ByteBuffer byteBuffer = (ByteBuffer) messageRequest.getMessage().orElse(null);
            if (byteBuffer == null) {
                this.logger.info("Received registration request with no message, closing the connection...");
                return CompletableFuture.completedFuture(null);
            }
            Optional<TopicPublisherSubscriber> registerSubscriber = registerSubscriber(connectionId, byteBuffer);
            if (registerSubscriber.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            topicPublisherSubscriber = registerSubscriber.get();
            this.subscribers.put(Integer.valueOf(connectionId), topicPublisherSubscriber);
        } else {
            String topic = topicPublisherSubscriber.getTopic();
            if (!this.publishersManager.getPublisher(topic).isPresent()) {
                this.logger.log(Level.FINE, "No publishers found for topic {0}, closing...", topic);
                this.subscribers.remove(Integer.valueOf(connectionId));
                topicPublisherSubscriber.onComplete();
                return CompletableFuture.completedFuture(null);
            }
        }
        Preconditions.isTrue(!topicPublisherSubscriber.isCompleted());
        this.logger.log(Level.FINE, "Requesting next message for {0}", topicPublisherSubscriber.getCallerId());
        CompletableFuture<MessageResponse> request = topicPublisherSubscriber.request();
        this.logger.exiting("process", request);
        return request;
    }

    private Optional<TopicPublisherSubscriber> registerSubscriber(final int i, ByteBuffer byteBuffer) {
        ConnectionHeaderReader connectionHeaderReader = new ConnectionHeaderReader(new DataInputStream(new ByteBufferInputStream(byteBuffer)));
        Objects.requireNonNull(connectionHeaderReader);
        ConnectionHeader connectionHeader = (ConnectionHeader) Unchecked.get(connectionHeaderReader::read);
        this.logger.log(Level.FINE, "Incoming connection from {0}", connectionHeader.getCallerId());
        if (connectionHeader.getCallerId().isEmpty()) {
            this.logger.log(Level.FINE, "Caller id is empty, closing...");
            return Optional.empty();
        }
        if (connectionHeader.getTopic().isEmpty()) {
            this.logger.log(Level.FINE, "Topic is empty, closing...");
            return Optional.empty();
        }
        final String str = connectionHeader.getCallerId().get();
        final String str2 = connectionHeader.getTopic().get();
        Optional<TopicPublisher<?>> publisher = this.publishersManager.getPublisher(str2);
        if (publisher.isEmpty()) {
            this.logger.log(Level.FINE, "No publishers found for topic {0}, closing...", str2);
            return Optional.empty();
        }
        final TopicPublisher<?> topicPublisher = publisher.get();
        if (!this.headerValidator.validate(topicPublisher.getMessageClass(), connectionHeader)) {
            this.logger.log(Level.FINE, "Requested message validation error, closing...");
            return Optional.empty();
        }
        TopicPublisherSubscriber topicPublisherSubscriber = new TopicPublisherSubscriber(this.tracingToken, str, str2, topicPublisher.getMessageClass(), this.utils) { // from class: id.jros1client.ros.transport.TcpRosServer.1
            @Override // id.jros1client.ros.transport.TopicPublisherSubscriber, java.util.concurrent.Flow.Subscriber
            public void onError(Throwable th) {
                TcpRosServer.this.logger.warning("Publisher of topic {0} for caller {1} throwed error {2}: {3}", new Object[]{str2, str, th.getClass(), th.getMessage()});
                TcpRosServer.this.subscribers.remove(Integer.valueOf(i));
                topicPublisher.onPublishError(th);
                TcpRosServer.this.closedConnections.add(Integer.valueOf(i));
                super.onError(th);
            }
        };
        topicPublisher.subscribe(topicPublisherSubscriber);
        this.logger.log(Level.FINE, "Received connection header {0}", connectionHeader);
        return Optional.of(topicPublisherSubscriber);
    }

    public boolean isClosed() {
        return !this.isStarted;
    }
}
