/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi;

import io.moquette.spi.ClientSession;
import io.moquette.spi.EnqueuedMessage;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.ISubscriptionsStore;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.Topic;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSession
extends ClientSession {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSession.class);
    private final ISessionsStore sessionsStore;
    private final ISubscriptionsStore subscriptionsStore;
    private final OutboundFlightZone outboundFlightZone;
    private final InboundFlightZone inboundFlightZone;

    public DurableSession(String clientID, ISessionsStore sessions, ISubscriptionsStore subscriptionsStore) {
        super(clientID);
        this.subscriptionsStore = subscriptionsStore;
        this.sessionsStore = sessions;
        this.outboundFlightZone = new OutboundFlightZone();
        this.inboundFlightZone = new InboundFlightZone();
    }

    public void reloadAllSubscriptionsFromStore() {
        Collection<Subscription> reloadedSubscriptions = this.subscriptionsStore.listClientSubscriptions(this.clientID);
        this.subscriptions.addAll(reloadedSubscriptions);
    }

    @Override
    public boolean isCleanSession() {
        return false;
    }

    @Override
    public boolean subscribe(Subscription newSubscription) {
        LOG.info("Adding new subscription. CId={}, topics={}, qos={}", new Object[]{newSubscription.getClientId(), newSubscription.getTopicFilter(), newSubscription.getRequestedQos()});
        boolean validTopic = newSubscription.getTopicFilter().isValid();
        if (!validTopic) {
            LOG.warn("The topic filter is not valid. CId={}, topics={}", (Object)newSubscription.getClientId(), (Object)newSubscription.getTopicFilter());
            return false;
        }
        Subscription existingSub = this.subscriptionsStore.reload(newSubscription);
        if (existingSub == null || existingSub.qosLessThan(newSubscription)) {
            if (existingSub != null) {
                LOG.info("Subscription already existed with a lower QoS value. It will be updated. CId={}, topics={}, existingQos={}, newQos={}", new Object[]{newSubscription.getClientId(), newSubscription.getTopicFilter(), existingSub.getRequestedQos(), newSubscription.getRequestedQos()});
                this.subscriptions.remove(newSubscription);
            }
            this.subscriptions.add(newSubscription);
            this.subscriptionsStore.addNewSubscription(newSubscription);
        }
        return true;
    }

    @Override
    public void unsubscribeFrom(Topic topicFilter) {
        LOG.info("Removing subscription. CId={}, topics={}", (Object)this.clientID, (Object)topicFilter);
        this.subscriptionsStore.removeSubscription(topicFilter, this.clientID);
        HashSet<Subscription> subscriptionsToRemove = new HashSet<Subscription>();
        for (Subscription sub : this.subscriptions) {
            if (!sub.getTopicFilter().equals(topicFilter)) continue;
            subscriptionsToRemove.add(sub);
        }
        this.subscriptions.removeAll(subscriptionsToRemove);
    }

    @Override
    public void cleanSession() {
        this.sessionsStore.removeTemporaryQoS2(this.clientID);
        LOG.info("Wiping existing subscriptions. ClientId={}", (Object)this.clientID);
        this.subscriptionsStore.wipeSubscriptions(this.clientID);
        LOG.info("Removing queues. ClientId={}", (Object)this.clientID);
        this.sessionsStore.dropQueue(this.clientID);
    }

    @Override
    public void disconnect() {
        LOG.info("Client disconnected. Removing its subscriptions. CId={}", (Object)this.clientID);
        this.cleanSession();
    }

    @Override
    protected int nextPacketId() {
        return this.sessionsStore.nextPacketID(this.clientID);
    }

    @Override
    public IMessagesStore.StoredMessage inFlightAcknowledged(int messageID) {
        return this.outboundFlightZone.acknowledged(messageID);
    }

    @Override
    public int inFlightAckWaiting(IMessagesStore.StoredMessage msg) {
        LOG.debug("Adding message to inflight zone. CId={}", (Object)this.clientID);
        int messageId = this.nextPacketId();
        this.outboundFlightZone.waitingAck(messageId, msg);
        return messageId;
    }

    @Override
    public IMessagesStore.StoredMessage inboundInflight(int messageID) {
        return this.inboundFlightZone.lookup(messageID);
    }

    @Override
    public void markAsInboundInflight(int messageID, IMessagesStore.StoredMessage msg) {
        this.inboundFlightZone.waitingRel(messageID, msg);
    }

    @Override
    public void moveInFlightToSecondPhaseAckWaiting(int messageID, IMessagesStore.StoredMessage msg) {
        this.sessionsStore.moveInFlightToSecondPhaseAckWaiting(this.clientID, messageID, msg);
    }

    @Override
    public boolean isEmptyQueue() {
        return this.sessionsStore.queue(this.clientID).isEmpty();
    }

    @Override
    public void dropQueue() {
        LOG.debug("Removing messages of session. CId={}", (Object)this.clientID);
        this.sessionsStore.dropQueue(this.clientID);
        LOG.debug("Messages of the session have been removed. CId={}", (Object)this.clientID);
    }

    @Override
    public EnqueuedMessage poll() {
        IMessagesStore.StoredMessage msg = this.sessionsStore.queue(this.clientID).poll();
        if (msg == null) {
            return null;
        }
        int messageId = this.inFlightAckWaiting(msg);
        return new EnqueuedMessage(msg, messageId);
    }

    @Override
    public void enqueue(IMessagesStore.StoredMessage message) {
        this.sessionsStore.queue(this.clientID).add(message);
    }

    @Override
    public IMessagesStore.StoredMessage completeReleasedPublish(int messageID) {
        return this.sessionsStore.completeReleasedPublish(this.clientID, messageID);
    }

    @Override
    public int getPendingPublishMessagesNo() {
        return this.sessionsStore.queue(this.clientID).size();
    }

    @Override
    public int countPubReleaseWaitingPubComplete() {
        return this.sessionsStore.countPubReleaseWaitingPubComplete(this.clientID);
    }

    @Override
    public int getInflightMessagesNo() {
        return this.sessionsStore.getInflightMessagesNo(this.clientID);
    }

    class InboundFlightZone {
        final Map<Integer, IMessagesStore.StoredMessage> inboundFlightMessages = new ConcurrentHashMap<Integer, IMessagesStore.StoredMessage>();

        InboundFlightZone() {
        }

        IMessagesStore.StoredMessage lookup(int messageID) {
            return this.inboundFlightMessages.get(messageID);
        }

        void waitingRel(int messageID, IMessagesStore.StoredMessage msg) {
            this.inboundFlightMessages.put(messageID, msg);
        }
    }

    class OutboundFlightZone {
        OutboundFlightZone() {
        }

        void waitingAck(int messageID, IMessagesStore.StoredMessage msg) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Adding to inflight {}, guid <{}>", (Object)messageID, (Object)msg.getGuid());
            }
            DurableSession.this.sessionsStore.inFlight(DurableSession.this.clientID, messageID, msg);
        }

        IMessagesStore.StoredMessage acknowledged(int messageID) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Acknowledging inflight, clientID <{}> messageID {}", (Object)DurableSession.this.clientID, (Object)messageID);
            }
            return DurableSession.this.sessionsStore.inFlightAck(DurableSession.this.clientID, messageID);
        }
    }
}

