/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.server.ConnectionDescriptorStore;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.impl.DebugUtils;
import io.moquette.spi.impl.PersistentQueueMessageSender;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.SessionsRepository;
import io.moquette.spi.impl.subscriptions.ISubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessagesPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(MessagesPublisher.class);
    private final ConnectionDescriptorStore connectionDescriptors;
    private final PersistentQueueMessageSender messageSender;
    private final ISubscriptionsDirectory subscriptions;
    private SessionsRepository sessionsRepository;

    public MessagesPublisher(ConnectionDescriptorStore connectionDescriptors, PersistentQueueMessageSender messageSender, ISubscriptionsDirectory subscriptions, SessionsRepository sessionsRepository) {
        this.connectionDescriptors = connectionDescriptors;
        this.messageSender = messageSender;
        this.subscriptions = subscriptions;
        this.sessionsRepository = sessionsRepository;
    }

    static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
        return MessagesPublisher.notRetainedPublishWithMessageId(topic, qos, message, 0);
    }

    private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
        return new MqttPublishMessage(fixedHeader, varHeader, message);
    }

    void publish2Subscribers(IMessagesStore.StoredMessage pubMsg, Topic topic, int messageID) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}, payload={}, subscriptionTree={}", new Object[]{pubMsg.getClientID(), topic, messageID, DebugUtils.payload2Str(pubMsg.getPayload()), this.subscriptions.dumpTree()});
        } else {
            LOG.info("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}", new Object[]{pubMsg.getClientID(), topic, messageID});
        }
        this.publish2Subscribers(pubMsg, topic);
    }

    void publish2Subscribers(IMessagesStore.StoredMessage pubMsg, Topic topic) {
        List<Subscription> topicMatchingSubscriptions = this.subscriptions.matches(topic);
        String topic1 = pubMsg.getTopic();
        MqttQoS publishingQos = pubMsg.getQos();
        ByteBuf origPayload = pubMsg.getPayload();
        for (Subscription sub : topicMatchingSubscriptions) {
            MqttQoS qos = ProtocolProcessor.lowerQosToTheSubscriptionDesired(sub, publishingQos);
            ClientSession targetSession = this.sessionsRepository.sessionForClient(sub.getClientId());
            boolean targetIsActive = this.connectionDescriptors.isConnected(sub.getClientId());
            if (targetIsActive) {
                MqttPublishMessage publishMsg;
                LOG.debug("Sending PUBLISH message to active subscriber. CId={}, topicFilter={}, qos={}", new Object[]{sub.getClientId(), sub.getTopicFilter(), qos});
                ByteBuf payload = origPayload.retainedDuplicate();
                if (qos != MqttQoS.AT_MOST_ONCE) {
                    int messageId = targetSession.inFlightAckWaiting(pubMsg);
                    publishMsg = MessagesPublisher.notRetainedPublishWithMessageId(topic1, qos, payload, messageId);
                } else {
                    publishMsg = MessagesPublisher.notRetainedPublish(topic1, qos, payload);
                }
                this.messageSender.sendPublish(targetSession, publishMsg);
                continue;
            }
            if (targetSession.isCleanSession()) continue;
            LOG.debug("Storing pending PUBLISH inactive message. CId={}, topicFilter={}, qos={}", new Object[]{sub.getClientId(), sub.getTopicFilter(), qos});
            targetSession.enqueue(pubMsg);
        }
    }
}

