/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.ConnectionDescriptorStore;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.EnqueuedMessage;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.ISubscriptionsStore;
import io.moquette.spi.impl.BrokerInterceptor;
import io.moquette.spi.impl.InternalRepublisher;
import io.moquette.spi.impl.MessagesPublisher;
import io.moquette.spi.impl.PersistentQueueMessageSender;
import io.moquette.spi.impl.Qos0PublishHandler;
import io.moquette.spi.impl.Qos1PublishHandler;
import io.moquette.spi.impl.Qos2PublishHandler;
import io.moquette.spi.impl.SessionsRepository;
import io.moquette.spi.impl.Utils;
import io.moquette.spi.impl.subscriptions.ISubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.Topic;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtocolProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    private ConnectionDescriptorStore connectionDescriptors;
    private ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;
    private ISubscriptionsDirectory subscriptions;
    private ISubscriptionsStore subscriptionStore;
    private boolean allowAnonymous;
    private boolean allowZeroByteClientId;
    private IAuthorizator m_authorizator;
    private IMessagesStore m_messagesStore;
    private ISessionsStore m_sessionsStore;
    private IAuthenticator m_authenticator;
    private BrokerInterceptor m_interceptor;
    private Qos0PublishHandler qos0PublishHandler;
    private Qos1PublishHandler qos1PublishHandler;
    private Qos2PublishHandler qos2PublishHandler;
    private MessagesPublisher messagesPublisher;
    private InternalRepublisher internalRepublisher;
    SessionsRepository sessionsRepository;
    private ConcurrentMap<String, WillMessage> m_willStore = new ConcurrentHashMap<String, WillMessage>();

    ProtocolProcessor() {
    }

    public void init(ISubscriptionsDirectory subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, IAuthorizator authorizator, BrokerInterceptor interceptor, SessionsRepository sessionsRepository) {
        this.init(subscriptions, storageService, sessionsStore, authenticator, allowAnonymous, false, authorizator, interceptor, sessionsRepository);
    }

    public void init(ISubscriptionsDirectory subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, boolean allowZeroByteClientId, IAuthorizator authorizator, BrokerInterceptor interceptor, SessionsRepository sessionsRepository) {
        this.init(new ConnectionDescriptorStore(sessionsRepository), subscriptions, storageService, sessionsStore, authenticator, allowAnonymous, allowZeroByteClientId, authorizator, interceptor, sessionsRepository);
    }

    void init(ConnectionDescriptorStore connectionDescriptors, ISubscriptionsDirectory subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, boolean allowZeroByteClientId, IAuthorizator authorizator, BrokerInterceptor interceptor, SessionsRepository sessionsRepository) {
        LOG.info("Initializing MQTT protocol processor...");
        this.connectionDescriptors = connectionDescriptors;
        this.subscriptionInCourse = new ConcurrentHashMap<RunningSubscription, SubscriptionState>();
        this.m_interceptor = interceptor;
        this.subscriptions = subscriptions;
        this.allowAnonymous = allowAnonymous;
        this.allowZeroByteClientId = allowZeroByteClientId;
        this.m_authorizator = authorizator;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initial subscriptions tree={}", (Object)subscriptions.dumpTree());
        }
        this.m_authenticator = authenticator;
        this.m_messagesStore = storageService;
        this.m_sessionsStore = sessionsStore;
        this.subscriptionStore = sessionsStore.subscriptionStore();
        this.sessionsRepository = sessionsRepository;
        LOG.info("Initializing messages publisher...");
        PersistentQueueMessageSender messageSender = new PersistentQueueMessageSender(this.connectionDescriptors);
        this.messagesPublisher = new MessagesPublisher(connectionDescriptors, messageSender, subscriptions, this.sessionsRepository);
        LOG.info("Initializing QoS publish handlers...");
        this.qos0PublishHandler = new Qos0PublishHandler(this.m_authorizator, this.m_messagesStore, this.m_interceptor, this.messagesPublisher);
        this.qos1PublishHandler = new Qos1PublishHandler(this.m_authorizator, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.messagesPublisher);
        this.qos2PublishHandler = new Qos2PublishHandler(this.m_authorizator, subscriptions, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.messagesPublisher, this.sessionsRepository);
        LOG.info("Initializing internal republisher...");
        this.internalRepublisher = new InternalRepublisher(messageSender);
    }

    public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.info("Processing CONNECT message. CId={}, username={}", (Object)clientId, (Object)payload.userName());
        if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel() && msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage badProto = this.connAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
            LOG.error("MQTT protocol version is not valid. CId={}", (Object)clientId);
            channel.writeAndFlush((Object)badProto);
            channel.close();
            return;
        }
        boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            if (!cleanSession || !this.allowZeroByteClientId) {
                MqttConnAckMessage badId = this.connAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                channel.writeAndFlush((Object)badId);
                channel.close();
                LOG.error("The MQTT client ID cannot be empty. Username={}", (Object)payload.userName());
                return;
            }
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", (Object)clientId, (Object)payload.userName());
        }
        if (!this.login(channel, msg, clientId)) {
            channel.close();
            return;
        }
        ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", (Object)clientId);
            existing.abort();
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }
        this.initializeKeepAliveTimeout(channel, msg, clientId);
        this.storeWillMessage(msg, clientId);
        if (!this.sendAck(descriptor, msg, clientId)) {
            channel.close();
            return;
        }
        this.m_interceptor.notifyClientConnected(msg);
        if (!descriptor.assignState(ConnectionDescriptor.ConnectionState.SENDACK, ConnectionDescriptor.ConnectionState.SESSION_CREATED)) {
            channel.close();
            return;
        }
        ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
        if (!this.republish(descriptor, msg, clientSession)) {
            channel.close();
            return;
        }
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED, ConnectionDescriptor.ConnectionState.ESTABLISHED);
        if (!success) {
            channel.close();
        }
        LOG.info("CONNECT message processed CId={}, username={}", (Object)clientId, (Object)payload.userName());
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode) {
        return this.connAck(returnCode, false);
    }

    private MqttConnAckMessage connAckWithSessionPresent(MqttConnectReturnCode returnCode) {
        return this.connAck(returnCode, true);
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
        return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
    }

    private boolean login(Channel channel, MqttConnectMessage msg, String clientId) {
        if (msg.variableHeader().hasUserName()) {
            byte[] pwd = null;
            if (msg.variableHeader().hasPassword()) {
                pwd = msg.payload().password().getBytes();
            } else if (!this.allowAnonymous) {
                LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", (Object)clientId);
                this.failedCredentials(channel);
                return false;
            }
            if (!this.m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
                LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}", new Object[]{clientId, msg.payload().userName(), pwd});
                this.failedCredentials(channel);
                return false;
            }
            NettyUtils.userName(channel, msg.payload().userName());
        } else if (!this.allowAnonymous) {
            LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", (Object)clientId);
            this.failedCredentials(channel);
            return false;
        }
        return true;
    }

    private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, String clientId) {
        LOG.info("Sending CONNACK. CId={}", (Object)clientId);
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.DISCONNECTED, ConnectionDescriptor.ConnectionState.SENDACK);
        if (!success) {
            return false;
        }
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientId);
        boolean isSessionAlreadyStored = clientSession != null;
        boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
        MqttConnAckMessage okResp = !msgCleanSessionFlag && isSessionAlreadyStored ? this.connAckWithSessionPresent(MqttConnectReturnCode.CONNECTION_ACCEPTED) : this.connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED);
        descriptor.writeAndFlush(okResp);
        LOG.info("CONNACK has been sent. CId={}", (Object)clientId);
        if (isSessionAlreadyStored && msgCleanSessionFlag) {
            for (Subscription existingSub : clientSession.getSubscriptions()) {
                this.subscriptions.removeSubscription(existingSub.getTopicFilter(), clientId);
            }
        }
        return true;
    }

    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
        int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
        LOG.info("Configuring connection. CId={}", (Object)clientId);
        NettyUtils.keepAlive(channel, keepAlive);
        NettyUtils.cleanSession(channel, msg.variableHeader().isCleanSession());
        NettyUtils.clientID(channel, clientId);
        int idleTime = Math.round((float)keepAlive * 1.5f);
        this.setIdleTime(channel.pipeline(), idleTime);
        LOG.debug("The connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}", new Object[]{clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime});
    }

    private void storeWillMessage(MqttConnectMessage msg, String clientId) {
        if (msg.variableHeader().isWillFlag()) {
            MqttQoS willQos = MqttQoS.valueOf((int)msg.variableHeader().willQos());
            LOG.info("Configuring MQTT last will and testament CId={}, willQos={}, willTopic={}, willRetain={}", new Object[]{clientId, willQos, msg.payload().willTopic(), msg.variableHeader().isWillRetain()});
            byte[] willPayload = msg.payload().willMessage().getBytes();
            ByteBuffer bb = (ByteBuffer)ByteBuffer.allocate(willPayload.length).put(willPayload).flip();
            WillMessage will = new WillMessage(msg.payload().willTopic(), bb, msg.variableHeader().isWillRetain(), willQos);
            this.m_willStore.put(clientId, will);
            LOG.info("MQTT last will and testament has been configured. CId={}", (Object)clientId);
        }
    }

    private boolean republish(ConnectionDescriptor descriptor, MqttConnectMessage msg, ClientSession clientSession) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.SESSION_CREATED, ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED);
        if (!success) {
            return false;
        }
        if (!msg.variableHeader().isCleanSession()) {
            LOG.info("Republishing stored publish events. CId={}", (Object)clientSession.clientID);
            this.internalRepublisher.publishStored(clientSession);
        }
        int flushIntervalMs = 500;
        descriptor.setupAutoFlusher(flushIntervalMs);
        return true;
    }

    private void failedCredentials(Channel session) {
        session.writeAndFlush((Object)this.connAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
        LOG.info("Client {} failed to connect with bad username or password.", (Object)session);
    }

    private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
        if (pipeline.names().contains("idleStateHandler")) {
            pipeline.remove("idleStateHandler");
        }
        pipeline.addFirst("idleStateHandler", (ChannelHandler)new IdleStateHandler(idleTime, 0, 0));
    }

    public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", (Object)messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        IMessagesStore.StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);
        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username, messageID);
        this.m_interceptor.notifyMessageAcknowledged(wrapped);
    }

    public static IMessagesStore.StoredMessage asStoredMessage(MqttPublishMessage msg) {
        ByteBuf payload = msg.payload();
        byte[] payloadContent = Utils.readBytesAndRewind(payload);
        IMessagesStore.StoredMessage stored = new IMessagesStore.StoredMessage(payloadContent, msg.fixedHeader().qosLevel(), msg.variableHeader().topicName());
        stored.setRetained(msg.fixedHeader().isRetain());
        return stored;
    }

    private static IMessagesStore.StoredMessage asStoredMessage(WillMessage will) {
        IMessagesStore.StoredMessage pub = new IMessagesStore.StoredMessage(will.getPayload().array(), will.getQos(), will.getTopic());
        pub.setRetained(will.isRetained());
        return pub;
    }

    public void processPublish(Channel channel, MqttPublishMessage msg) {
        MqttQoS qos = msg.fixedHeader().qosLevel();
        String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", new Object[]{clientId, msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos});
        switch (qos) {
            case AT_MOST_ONCE: {
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            }
            case AT_LEAST_ONCE: {
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            }
            case EXACTLY_ONCE: {
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            }
            default: {
                LOG.error("Unknown QoS-Type:{}", (Object)qos);
            }
        }
    }

    public void internalPublish(MqttPublishMessage msg, String clientId) {
        MqttQoS qos = msg.fixedHeader().qosLevel();
        Topic topic = new Topic(msg.variableHeader().topicName());
        LOG.info("Sending PUBLISH message. Topic={}, qos={}", (Object)topic, (Object)qos);
        IMessagesStore.StoredMessage toStoreMsg = ProtocolProcessor.asStoredMessage(msg);
        if (clientId == null || clientId.isEmpty()) {
            toStoreMsg.setClientID("BROKER_SELF");
        } else {
            toStoreMsg.setClientID(clientId);
        }
        this.messagesPublisher.publish2Subscribers(toStoreMsg, topic);
        if (!msg.fixedHeader().isRetain()) {
            return;
        }
        if (qos == MqttQoS.AT_MOST_ONCE || msg.payload().readableBytes() == 0) {
            this.m_messagesStore.cleanRetained(topic);
            return;
        }
        this.m_messagesStore.storeRetained(topic, toStoreMsg);
    }

    private void forwardPublishWill(WillMessage will, String clientID) {
        LOG.info("Publishing will message. CId={}, topic={}", (Object)clientID, (Object)will.getTopic());
        IMessagesStore.StoredMessage tobeStored = ProtocolProcessor.asStoredMessage(will);
        tobeStored.setClientID(clientID);
        Topic topic = new Topic(tobeStored.getTopic());
        this.messagesPublisher.publish2Subscribers(tobeStored, topic);
        if (will.isRetained()) {
            this.m_messagesStore.storeRetained(topic, tobeStored);
        }
    }

    static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) {
        if (qos.value() > sub.getRequestedQos().value()) {
            qos = sub.getRequestedQos();
        }
        return qos;
    }

    public void processPubRel(Channel channel, MqttMessage msg) {
        this.qos2PublishHandler.processPubRel(channel, msg);
    }

    public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = Utils.messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        IMessagesStore.StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, (Object)MqttMessageIdVariableHeader.from((int)messageID));
        channel.writeAndFlush((Object)pubRelMessage);
    }

    public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = Utils.messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        IMessagesStore.StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic, username, messageID);
        this.m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
    }

    public void processDisconnect(Channel channel) throws InterruptedException {
        String clientID = NettyUtils.clientID(channel);
        LOG.info("Processing DISCONNECT message. CId={}", (Object)clientID);
        channel.flush();
        ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
        if (existingDescriptor == null) {
            channel.close();
            return;
        }
        if (existingDescriptor.doesNotUseChannel(channel)) {
            LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", (Object)clientID);
            existingDescriptor.abort();
            return;
        }
        if (!this.removeSubscriptions(existingDescriptor, clientID)) {
            LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", (Object)clientID);
            existingDescriptor.abort();
            return;
        }
        if (!this.dropStoredMessages(existingDescriptor, clientID)) {
            LOG.warn("Unable to drop stored messages. Closing connection. CId={}", (Object)clientID);
            existingDescriptor.abort();
            return;
        }
        if (!this.cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
            LOG.warn("Unable to drop will message. Closing connection. CId={}", (Object)clientID);
            existingDescriptor.abort();
            return;
        }
        if (!existingDescriptor.close()) {
            LOG.info("Connection has been closed. CId={}", (Object)clientID);
            return;
        }
        boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
        if (!stillPresent) {
            LOG.warn("Another descriptor has been inserted. CId={}", (Object)clientID);
            return;
        }
        this.sessionsRepository.disconnect(clientID);
        LOG.info("DISCONNECT message has been processed. CId={}", (Object)clientID);
    }

    private boolean removeSubscriptions(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.ESTABLISHED, ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED);
        if (!success) {
            return false;
        }
        if (descriptor.cleanSession) {
            LOG.info("Removing saved subscriptions. CId={}", (Object)descriptor.clientID);
            this.subscriptionStore.wipeSubscriptions(clientID);
            LOG.info("Saved subscriptions have been removed. CId={}", (Object)descriptor.clientID);
        }
        return true;
    }

    private boolean dropStoredMessages(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED, ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED);
        if (!success) {
            return false;
        }
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientID);
        if (clientSession.isCleanSession()) {
            clientSession.dropQueue();
        }
        return true;
    }

    private boolean cleanWillMessageAndNotifyInterceptor(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED, ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED);
        if (!success) {
            return false;
        }
        LOG.info("Removing will message. ClientId={}", (Object)descriptor.clientID);
        this.m_willStore.remove(clientID);
        String username = descriptor.getUsername();
        this.m_interceptor.notifyClientDisconnected(clientID, username);
        return true;
    }

    public void processConnectionLost(String clientID, Channel channel) {
        LOG.info("Processing connection lost event. CId={}", (Object)clientID);
        ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
        this.connectionDescriptors.removeConnection(oldConnDescr);
        if (this.m_willStore.containsKey(clientID)) {
            WillMessage will = (WillMessage)this.m_willStore.get(clientID);
            this.forwardPublishWill(will, clientID);
            this.m_willStore.remove(clientID);
        }
        String username = NettyUtils.userName(channel);
        this.m_interceptor.notifyClientConnectionLost(clientID, username);
    }

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
        List topics = msg.payload().topics();
        String clientID = NettyUtils.clientID(channel);
        LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", (Object)clientID, (Object)topics);
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientID);
        for (String t : topics) {
            Topic topic = new Topic(t);
            boolean validTopic = topic.isValid();
            if (!validTopic) {
                channel.close();
                LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", new Object[]{clientID, topics, topic});
                return;
            }
            LOG.debug("Removing subscription. CId={}, topic={}", (Object)clientID, (Object)topic);
            this.subscriptions.removeSubscription(topic, clientID);
            clientSession.unsubscribeFrom(topic);
            String username = NettyUtils.userName(channel);
            this.m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
        }
        int messageID = msg.variableHeader().messageId();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageID));
        LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", new Object[]{clientID, topics, messageID});
        channel.writeAndFlush((Object)ackMessage);
    }

    public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = Utils.messageId((MqttMessage)msg);
        LOG.info("Processing SUBSCRIBE message. CId={}, messageId={}", (Object)clientID, (Object)messageID);
        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = this.subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}", (Object)clientID, (Object)messageID);
            return;
        }
        String username = NettyUtils.userName(channel);
        List<MqttTopicSubscription> ackTopics = this.doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = this.doAckMessageFromValidateFilters(ackTopics, messageID);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, messageId={}", (Object)clientID, (Object)messageID);
            return;
        }
        LOG.info("Creating and storing subscriptions CId={}, messageId={}, topics={}", new Object[]{clientID, messageID, ackTopics});
        List<Subscription> newSubscriptions = this.doStoreSubscription(ackTopics, clientID);
        for (Subscription subscription : newSubscriptions) {
            this.subscriptions.add(subscription);
        }
        LOG.info("Sending SUBACK response CId={}, messageId={}", (Object)clientID, (Object)messageID);
        channel.writeAndFlush((Object)ackMessage);
        for (Subscription subscription : newSubscriptions) {
            this.publishRetainedMessagesInSession(subscription, username);
        }
        boolean success = this.subscriptionInCourse.remove(executionKey, (Object)SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", (Object)clientID, (Object)messageID);
        }
    }

    private List<Subscription> doStoreSubscription(List<MqttTopicSubscription> ackTopics, String clientID) {
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientID);
        ArrayList<Subscription> newSubscriptions = new ArrayList<Subscription>();
        for (MqttTopicSubscription req : ackTopics) {
            if (req.qualityOfService() == MqttQoS.FAILURE) continue;
            Subscription newSubscription = new Subscription(clientID, new Topic(req.topicName()), req.qualityOfService());
            clientSession.subscribe(newSubscription);
            newSubscriptions.add(newSubscription);
        }
        return newSubscriptions;
    }

    private List<MqttTopicSubscription> doVerify(String clientID, String username, MqttSubscribeMessage msg) {
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientID);
        ArrayList<MqttTopicSubscription> ackTopics = new ArrayList<MqttTopicSubscription>();
        int messageId = Utils.messageId((MqttMessage)msg);
        for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
            MqttQoS qos;
            Topic topic = new Topic(req.topicName());
            if (!this.m_authorizator.canRead(topic, username, clientSession.clientID)) {
                LOG.error("Client does not have read permissions on the topic CId={}, username={}, messageId={}, topic={}", new Object[]{clientID, username, messageId, topic});
                ackTopics.add(new MqttTopicSubscription(topic.toString(), MqttQoS.FAILURE));
                continue;
            }
            if (topic.isValid()) {
                LOG.info("Client will be subscribed to the topic CId={}, username={}, messageId={}, topic={}", new Object[]{clientID, username, messageId, topic});
                qos = req.qualityOfService();
            } else {
                LOG.error("Topic filter is not valid CId={}, username={}, messageId={}, topic={}", new Object[]{clientID, username, messageId, topic});
                qos = MqttQoS.FAILURE;
            }
            ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
        }
        return ackTopics;
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> topicFilters, int messageId) {
        ArrayList<Integer> grantedQoSLevels = new ArrayList<Integer>();
        for (MqttTopicSubscription req : topicFilters) {
            grantedQoSLevels.add(req.qualityOfService().value());
        }
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
        return new MqttSubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageId), payload);
    }

    private void publishRetainedMessagesInSession(Subscription newSubscription, String username) {
        LOG.info("Retrieving retained messages CId={}, topics={}", (Object)newSubscription.getClientId(), (Object)newSubscription.getTopicFilter());
        Collection<IMessagesStore.StoredMessage> messages = this.m_messagesStore.searchMatching(key -> key.match(newSubscription.getTopicFilter()));
        if (!messages.isEmpty()) {
            LOG.info("Publishing retained messages CId={}, topics={}, messagesNo={}", new Object[]{newSubscription.getClientId(), newSubscription.getTopicFilter(), messages.size()});
        }
        ClientSession targetSession = this.sessionsRepository.sessionForClient(newSubscription.getClientId());
        this.internalRepublisher.publishRetained(targetSession, messages);
        this.m_interceptor.notifyTopicSubscribed(newSubscription, username);
    }

    public void notifyChannelWritable(Channel channel) {
        String clientID = NettyUtils.clientID(channel);
        ClientSession clientSession = this.sessionsRepository.sessionForClient(clientID);
        boolean emptyQueue = false;
        while (channel.isWritable() && !emptyQueue) {
            EnqueuedMessage msg = clientSession.poll();
            if (msg == null) {
                emptyQueue = true;
                continue;
            }
            MqttPublishMessage pubMsg = InternalRepublisher.createPublishForQos(msg.msg.getTopic(), msg.msg.getQos(), msg.msg.getPayload(), msg.msg.isRetained(), msg.messageId);
            channel.write((Object)pubMsg);
        }
        channel.flush();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        this.m_interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        this.m_interceptor.removeInterceptHandler(interceptHandler);
    }

    public IMessagesStore getMessagesStore() {
        return this.m_messagesStore;
    }

    public ISessionsStore getSessionsStore() {
        return this.m_sessionsStore;
    }

    public void shutdown() {
        if (this.m_interceptor != null) {
            this.m_interceptor.stop();
        }
    }

    private class RunningSubscription {
        final String clientID;
        final long packetId;

        RunningSubscription(String clientID, long packetId) {
            this.clientID = clientID;
            this.packetId = packetId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RunningSubscription that = (RunningSubscription)o;
            return this.packetId == that.packetId && (this.clientID != null ? this.clientID.equals(that.clientID) : that.clientID == null);
        }

        public int hashCode() {
            int result = this.clientID != null ? this.clientID.hashCode() : 0;
            result = 31 * result + (int)(this.packetId ^ this.packetId >>> 32);
            return result;
        }
    }

    private static enum SubscriptionState {
        STORED,
        VERIFIED;

    }

    static final class WillMessage {
        private final String topic;
        private final ByteBuffer payload;
        private final boolean retained;
        private final MqttQoS qos;

        WillMessage(String topic, ByteBuffer payload, boolean retained, MqttQoS qos) {
            this.topic = topic;
            this.payload = payload;
            this.retained = retained;
            this.qos = qos;
        }

        public String getTopic() {
            return this.topic;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public boolean isRetained() {
            return this.retained;
        }

        public MqttQoS getQos() {
            return this.qos;
        }
    }
}

