/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.server;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.MessageListener;
import io.moquette.connections.IConnectionsManager;
import io.moquette.interception.HazelcastInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.logging.LoggingUtils;
import io.moquette.server.DefaultMoquetteSslContextCreator;
import io.moquette.server.HazelcastListener;
import io.moquette.server.ServerAcceptor;
import io.moquette.server.config.FileResourceLoader;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import io.moquette.server.config.ResourceLoaderConfig;
import io.moquette.server.netty.NettyAcceptor;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.ProtocolProcessorBootstrapper;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.moquette.spi.security.ISslContextCreator;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    private static final String HZ_INTERCEPT_HANDLER = HazelcastInterceptHandler.class.getCanonicalName();
    private ServerAcceptor m_acceptor;
    private volatile boolean m_initialized;
    private ProtocolProcessor m_processor;
    private HazelcastInstance hazelcastInstance;
    private ProtocolProcessorBootstrapper m_processorBootstrapper;
    private ScheduledExecutorService scheduler;

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.startServer();
        System.out.println("Server started, version 0.11-SNAPSHOT");
        Runtime.getRuntime().addShutdownHook(new Thread(server::stopServer));
    }

    public void startServer() throws IOException {
        File defaultConfigurationFile = Server.defaultConfigFile();
        LOG.info("Starting Moquette server. Configuration file path={}", (Object)defaultConfigurationFile.getAbsolutePath());
        FileResourceLoader filesystemLoader = new FileResourceLoader(defaultConfigurationFile);
        ResourceLoaderConfig config = new ResourceLoaderConfig(filesystemLoader);
        this.startServer(config);
    }

    private static File defaultConfigFile() {
        String configPath = System.getProperty("moquette.path", null);
        return new File(configPath, "config/moquette.conf");
    }

    public void startServer(File configFile) throws IOException {
        LOG.info("Starting Moquette server. Configuration file path={}", (Object)configFile.getAbsolutePath());
        FileResourceLoader filesystemLoader = new FileResourceLoader(configFile);
        ResourceLoaderConfig config = new ResourceLoaderConfig(filesystemLoader);
        this.startServer(config);
    }

    public void startServer(Properties configProps) throws IOException {
        LOG.info("Starting Moquette server using properties object");
        MemoryConfig config = new MemoryConfig(configProps);
        this.startServer(config);
    }

    public void startServer(IConfig config) throws IOException {
        LOG.info("Starting Moquette server using IConfig instance...");
        this.startServer(config, null);
    }

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers) throws IOException {
        LOG.info("Starting moquette server using IConfig instance and intercept handlers");
        this.startServer(config, handlers, null, null, null);
    }

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator, IAuthenticator authenticator, IAuthorizator authorizator) throws IOException {
        if (handlers == null) {
            handlers = Collections.emptyList();
        }
        LOG.info("Starting Moquette Server. MQTT message interceptors={}", LoggingUtils.getInterceptorIds(handlers));
        this.scheduler = Executors.newScheduledThreadPool(1);
        String handlerProp = System.getProperty("intercept.handler");
        if (handlerProp != null) {
            config.setProperty("intercept.handler", handlerProp);
        }
        this.configureCluster(config);
        String persistencePath = config.getProperty("persistent_store");
        LOG.info("Configuring Using persistent store file, path={}", (Object)persistencePath);
        this.m_processorBootstrapper = new ProtocolProcessorBootstrapper();
        ProtocolProcessor processor = this.m_processorBootstrapper.init(config, handlers, authenticator, authorizator, this);
        LOG.info("Initialized MQTT protocol processor");
        if (sslCtxCreator == null) {
            LOG.warn("Using default SSL context creator");
            sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
        }
        LOG.info("Binding server to the configured ports");
        this.m_acceptor = new NettyAcceptor();
        this.m_acceptor.initialize(processor, config, sslCtxCreator);
        this.m_processor = processor;
        LOG.info("Moquette server has been initialized successfully");
        this.m_initialized = true;
    }

    private void configureCluster(IConfig config) throws FileNotFoundException {
        LOG.info("Configuring embedded Hazelcast instance");
        String interceptHandlerClassname = config.getProperty("intercept.handler");
        if (interceptHandlerClassname == null || !HZ_INTERCEPT_HANDLER.equals(interceptHandlerClassname)) {
            LOG.info("There are no Hazelcast intercept handlers. The server won't start a Hazelcast instance.");
            return;
        }
        String hzConfigPath = config.getProperty("hazelcast.configuration");
        if (hzConfigPath != null) {
            boolean isHzConfigOnClasspath = this.getClass().getClassLoader().getResource(hzConfigPath) != null;
            ClasspathXmlConfig hzconfig = isHzConfigOnClasspath ? new ClasspathXmlConfig(hzConfigPath) : new FileSystemXmlConfig(hzConfigPath);
            LOG.info("Starting Hazelcast instance. ConfigurationFile={}", (Object)hzconfig);
            this.hazelcastInstance = Hazelcast.newHazelcastInstance((Config)hzconfig);
        } else {
            LOG.info("Starting Hazelcast instance with default configuration");
            this.hazelcastInstance = Hazelcast.newHazelcastInstance();
        }
        this.listenOnHazelCastMsg();
    }

    private void listenOnHazelCastMsg() {
        LOG.info("Subscribing to Hazelcast topic. TopicName={}", (Object)"moquette");
        HazelcastInstance hz = this.getHazelcastInstance();
        ITopic topic = hz.getTopic("moquette");
        topic.addMessageListener((MessageListener)new HazelcastListener(this));
    }

    public HazelcastInstance getHazelcastInstance() {
        return this.hazelcastInstance;
    }

    public void internalPublish(MqttPublishMessage msg, String clientId) {
        int messageID = msg.variableHeader().messageId();
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId={}, messageId={}", (Object)clientId, (Object)messageID);
            throw new IllegalStateException("Can't publish on a server is not yet started");
        }
        LOG.debug("Publishing message. CId={}, messageId={}", (Object)clientId, (Object)messageID);
        this.m_processor.internalPublish(msg, clientId);
    }

    public void stopServer() {
        LOG.info("Unbinding server from the configured ports");
        this.m_acceptor.close();
        LOG.trace("Stopping MQTT protocol processor");
        this.m_processorBootstrapper.shutdown();
        this.m_initialized = false;
        if (this.hazelcastInstance != null) {
            LOG.trace("Stopping embedded Hazelcast instance");
            try {
                this.hazelcastInstance.shutdown();
            }
            catch (HazelcastInstanceNotActiveException e) {
                LOG.warn("embedded Hazelcast instance is already shut down.");
            }
        }
        this.scheduler.shutdown();
        LOG.info("Moquette server has been stopped.");
    }

    public List<Subscription> getSubscriptions() {
        if (this.m_processorBootstrapper == null) {
            return null;
        }
        return this.m_processorBootstrapper.getSubscriptions();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be added. InterceptorId={}", (Object)interceptHandler.getID());
            throw new IllegalStateException("Can't register interceptors on a server that is not yet started");
        }
        LOG.info("Adding MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.m_processor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be removed. InterceptorId={}", (Object)interceptHandler.getID());
            throw new IllegalStateException("Can't deregister interceptors from a server that is not yet started");
        }
        LOG.info("Removing MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.m_processor.removeInterceptHandler(interceptHandler);
    }

    public IConnectionsManager getConnectionsManager() {
        return this.m_processorBootstrapper.getConnectionDescriptors();
    }

    public ProtocolProcessor getProcessor() {
        return this.m_processor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }
}

