/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.server.ConnectionDescriptorStore;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.impl.BrokerInterceptor;
import io.moquette.spi.impl.DebugUtils;
import io.moquette.spi.impl.MessagesPublisher;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.QosPublishHandler;
import io.moquette.spi.impl.SessionsRepository;
import io.moquette.spi.impl.Utils;
import io.moquette.spi.impl.subscriptions.ISubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Topic;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Qos2PublishHandler
extends QosPublishHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Qos2PublishHandler.class);
    private final ISubscriptionsDirectory subscriptions;
    private final IMessagesStore m_messagesStore;
    private final BrokerInterceptor m_interceptor;
    private final ConnectionDescriptorStore connectionDescriptors;
    private final MessagesPublisher publisher;
    private final SessionsRepository sessionsRepository;

    public Qos2PublishHandler(IAuthorizator authorizator, ISubscriptionsDirectory subscriptions, IMessagesStore messagesStore, BrokerInterceptor interceptor, ConnectionDescriptorStore connectionDescriptors, MessagesPublisher messagesPublisher, SessionsRepository sessionsRepository) {
        super(authorizator);
        this.subscriptions = subscriptions;
        this.m_messagesStore = messagesStore;
        this.m_interceptor = interceptor;
        this.connectionDescriptors = connectionDescriptors;
        this.publisher = messagesPublisher;
        this.sessionsRepository = sessionsRepository;
    }

    void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        Topic topic = new Topic(msg.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        if (!this.m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", (Object)clientID, (Object)topic);
            return;
        }
        int messageID = msg.variableHeader().messageId();
        IMessagesStore.StoredMessage toStoreMsg = ProtocolProcessor.asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);
        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", new Object[]{clientID, topic, messageID});
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", (Object)DebugUtils.payload2Str(toStoreMsg.getPayload()), (Object)this.subscriptions.dumpTree());
        }
        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);
        this.sendPubRec(clientID, messageID);
        this.m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

    void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = Utils.messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
        if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", (Object)clientID, (Object)messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        Topic topic = new Topic(evt.getTopic());
        this.publisher.publish2Subscribers(evt, topic, messageID);
        if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                this.m_messagesStore.cleanRetained(topic);
            } else {
                this.m_messagesStore.storeRetained(topic, evt);
            }
        }
        this.sendPubComp(clientID, messageID);
    }

    private void sendPubRec(String clientID, int messageID) {
        LOG.debug("Sending PUBREC message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage pubRecMessage = new MqttMessage(fixedHeader, (Object)MqttMessageIdVariableHeader.from((int)messageID));
        this.connectionDescriptors.sendMessage(pubRecMessage, messageID, clientID);
    }

    private void sendPubComp(String clientID, int messageID) {
        LOG.debug("Sending PUBCOMP message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage pubCompMessage = new MqttMessage(fixedHeader, (Object)MqttMessageIdVariableHeader.from((int)messageID));
        this.connectionDescriptors.sendMessage(pubCompMessage, messageID, clientID);
    }
}

