/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.server;

import io.moquette.connections.IConnectionsManager;
import io.moquette.connections.MqttConnectionMetrics;
import io.moquette.connections.MqttSession;
import io.moquette.connections.MqttSubscription;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.metrics.BytesMetrics;
import io.moquette.server.netty.metrics.MessageMetrics;
import io.moquette.spi.ClientSession;
import io.moquette.spi.impl.SessionsRepository;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionDescriptorStore
implements IConnectionsManager {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionDescriptorStore.class);
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors = new ConcurrentHashMap<String, ConnectionDescriptor>();
    private final SessionsRepository sessionsRepository;

    public ConnectionDescriptorStore(SessionsRepository sessionsRepository) {
        this.sessionsRepository = sessionsRepository;
    }

    public boolean sendMessage(MqttMessage message, Integer messageID, String clientID) {
        MqttMessageType messageType = message.fixedHeader().messageType();
        try {
            if (messageID != null) {
                LOG.info("Sending {} message CId=<{}>, messageId={}", new Object[]{messageType, clientID, messageID});
            } else {
                LOG.debug("Sending {} message CId=<{}>", (Object)messageType, (Object)clientID);
            }
            ConnectionDescriptor descriptor = (ConnectionDescriptor)this.connectionDescriptors.get(clientID);
            if (descriptor == null) {
                if (messageID != null) {
                    LOG.error("Client has just disconnected. {} message could not be sent. CId=<{}>, messageId={}", new Object[]{messageType, clientID, messageID});
                } else {
                    LOG.error("Client has just disconnected. {} could not be sent. CId=<{}>", (Object)messageType, (Object)clientID);
                }
                return false;
            }
            descriptor.writeAndFlush(message);
            return true;
        }
        catch (Throwable e) {
            String errorMsg = "Unable to send " + messageType + " message. CId=<" + clientID + ">";
            if (messageID != null) {
                errorMsg = errorMsg + ", messageId=" + messageID;
            }
            LOG.error(errorMsg, e);
            return false;
        }
    }

    public ConnectionDescriptor addConnection(ConnectionDescriptor descriptor) {
        return this.connectionDescriptors.putIfAbsent(descriptor.clientID, descriptor);
    }

    public boolean removeConnection(ConnectionDescriptor descriptor) {
        return this.connectionDescriptors.remove(descriptor.clientID, descriptor);
    }

    public ConnectionDescriptor getConnection(String clientID) {
        return (ConnectionDescriptor)this.connectionDescriptors.get(clientID);
    }

    @Override
    public boolean isConnected(String clientID) {
        return this.connectionDescriptors.containsKey(clientID);
    }

    @Override
    public int getActiveConnectionsNo() {
        return this.connectionDescriptors.size();
    }

    @Override
    public Collection<String> getConnectedClientIds() {
        return this.connectionDescriptors.keySet();
    }

    @Override
    public boolean closeConnection(String clientID, boolean closeImmediately) {
        ConnectionDescriptor descriptor = (ConnectionDescriptor)this.connectionDescriptors.get(clientID);
        if (descriptor == null) {
            LOG.error("Connection descriptor doesn't exist. MQTT connection cannot be closed. CId=<{}>, closeImmediately={}", (Object)clientID, (Object)closeImmediately);
            return false;
        }
        if (closeImmediately) {
            descriptor.abort();
            return true;
        }
        return descriptor.close();
    }

    @Override
    public Collection<MqttSession> getSessions() {
        LOG.info("Retrieving status of all sessions.");
        ArrayList<MqttSession> result = new ArrayList<MqttSession>();
        for (ClientSession session : this.sessionsRepository.getAllSessions()) {
            result.add(this.buildMqttSession(session));
        }
        return result;
    }

    private MqttSession buildMqttSession(ClientSession session) {
        MqttSession result = new MqttSession();
        ArrayList<MqttSubscription> mqttSubscriptions = new ArrayList<MqttSubscription>();
        for (Subscription subscription : session.getSubscriptions()) {
            mqttSubscriptions.add(new MqttSubscription(subscription.getRequestedQos().toString(), subscription.getClientId(), subscription.getTopicFilter().toString(), subscription.isActive()));
        }
        result.setActiveSubscriptions(mqttSubscriptions);
        result.setCleanSession(session.isCleanSession());
        ConnectionDescriptor descriptor = this.getConnection(session.clientID);
        if (descriptor != null) {
            result.setConnectionEstablished(true);
            BytesMetrics bytesMetrics = descriptor.getBytesMetrics();
            MessageMetrics messageMetrics = descriptor.getMessageMetrics();
            result.setConnectionMetrics(new MqttConnectionMetrics(bytesMetrics.readBytes(), bytesMetrics.wroteBytes(), messageMetrics.messagesRead(), messageMetrics.messagesWrote()));
        } else {
            result.setConnectionEstablished(false);
        }
        result.setPendingPublishMessagesNo(session.getPendingPublishMessagesNo());
        result.setSecondPhaseAckPendingMessages(session.countPubReleaseWaitingPubComplete());
        result.setInflightMessages(session.getInflightMessagesNo());
        return result;
    }
}

