/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.broker;

import io.moquette.broker.Authorizator;
import io.moquette.broker.BrokerConfiguration;
import io.moquette.broker.ClientDescriptor;
import io.moquette.broker.DefaultMoquetteSslContextCreator;
import io.moquette.broker.IQueueRepository;
import io.moquette.broker.IRetainedRepository;
import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.ISslContextCreator;
import io.moquette.broker.ISubscriptionsRepository;
import io.moquette.broker.MQTTConnectionFactory;
import io.moquette.broker.MemoryQueueRepository;
import io.moquette.broker.MemoryRetainedRepository;
import io.moquette.broker.NewNettyAcceptor;
import io.moquette.broker.NewNettyMQTTHandler;
import io.moquette.broker.PostOffice;
import io.moquette.broker.RoutingResults;
import io.moquette.broker.SessionEventLoopGroup;
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.FluentConfig;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.broker.security.ACLFileParser;
import io.moquette.broker.security.AcceptAllAuthenticator;
import io.moquette.broker.security.DenyAllAuthorizatorPolicy;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.security.PermitAllAuthorizatorPolicy;
import io.moquette.broker.security.ResourceAuthenticator;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
import io.moquette.broker.unsafequeues.QueueException;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.logging.LoggingUtils;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySessionsRepository;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.persistence.SegmentQueueRepository;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    public static final String MOQUETTE_VERSION = "0.17";
    private ScheduledExecutorService scheduler;
    private NewNettyAcceptor acceptor;
    private volatile boolean initialized;
    private PostOffice dispatcher;
    private BrokerInterceptor interceptor;
    private H2Builder h2Builder;
    private SessionRegistry sessions;
    private boolean standalone = false;

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        try {
            server.startStandaloneServer();
        }
        catch (RuntimeException e) {
            System.exit(1);
        }
        System.out.println("Server started, version 0.17");
        Runtime.getRuntime().addShutdownHook(new Thread(server::stopServer));
    }

    public void startServer() throws IOException {
        File defaultConfigurationFile = Server.defaultConfigFile();
        LOG.info("Starting Moquette integration. Configuration file path={}", (Object)defaultConfigurationFile.getAbsolutePath());
        FileResourceLoader filesystemLoader = new FileResourceLoader(defaultConfigurationFile);
        ResourceLoaderConfig config = new ResourceLoaderConfig(filesystemLoader);
        this.startServer(config);
    }

    private void startStandaloneServer() throws IOException {
        this.standalone = true;
        this.startServer();
    }

    private static File defaultConfigFile() {
        String configPath = System.getProperty("moquette.path", null);
        return new File(configPath, "config/moquette.conf");
    }

    public void startServer(File configFile) throws IOException {
        LOG.info("Starting Moquette integration. Configuration file path: {}", (Object)configFile.getAbsolutePath());
        FileResourceLoader filesystemLoader = new FileResourceLoader(configFile);
        ResourceLoaderConfig config = new ResourceLoaderConfig(filesystemLoader);
        this.startServer(config);
    }

    public void startServer(Properties configProps) throws IOException {
        LOG.debug("Starting Moquette integration using properties object");
        MemoryConfig config = new MemoryConfig(configProps);
        this.startServer(config);
    }

    public void startServer(IConfig config) throws IOException {
        LOG.debug("Starting Moquette integration using IConfig instance");
        this.startServer(config, null);
    }

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers) throws IOException {
        LOG.debug("Starting moquette integration using IConfig instance and intercept handlers");
        this.startServer(config, handlers, null, null, null);
    }

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator, IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException {
        ISessionsRepository sessionsRepository;
        IRetainedRepository retainedRepository;
        ISubscriptionsRepository subscriptionsRepository;
        IQueueRepository queueRepository;
        Object dataPath;
        long start = System.currentTimeMillis();
        if (handlers == null) {
            handlers = Collections.emptyList();
        }
        LOG.trace("Starting Moquette Server. MQTT message interceptors={}", LoggingUtils.getInterceptorIds(handlers));
        this.scheduler = Executors.newScheduledThreadPool(1);
        String handlerProp = System.getProperty("intercept.handler");
        if (handlerProp != null) {
            config.setProperty("intercept.handler", handlerProp);
        }
        this.initInterceptors(config, handlers);
        LOG.debug("Initialized MQTT protocol processor");
        if (sslCtxCreator == null) {
            LOG.info("Using default SSL context creator");
            sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
        }
        authenticator = this.initializeAuthenticator(authenticator, config);
        authorizatorPolicy = this.initializeAuthorizatorPolicy(authorizatorPolicy, config);
        if (config.getProperty("persistent_store") != null) {
            LOG.warn("Using a deprecated setting {} please update to {}", (Object)"persistent_store", (Object)"data_path");
            LOG.warn("Forcing {} to true", (Object)"persistence_enabled");
            config.setProperty("persistence_enabled", Boolean.TRUE.toString());
            String persistencePath = config.getProperty("persistent_store");
            dataPath = persistencePath.substring(0, persistencePath.lastIndexOf("/"));
            LOG.warn("Forcing {} to {}", (Object)"data_path", dataPath);
            config.setProperty("data_path", (String)dataPath);
        }
        Clock clock = Clock.systemDefaultZone();
        if (Boolean.parseBoolean(config.getProperty("persistence_enabled"))) {
            dataPath = Paths.get(config.getProperty("data_path"), new String[0]);
            if (!dataPath.toFile().exists()) {
                if (dataPath.toFile().mkdirs()) {
                    LOG.debug("Created data_path {} folder", dataPath);
                } else {
                    LOG.warn("Impossible to create the data_path {}", dataPath);
                }
            }
            LOG.debug("Configuring persistent subscriptions store and queues, path: {}", dataPath);
            int autosaveInterval = Integer.parseInt(config.getProperty("autosave_interval", "30"));
            this.h2Builder = new H2Builder(this.scheduler, (Path)dataPath, autosaveInterval, clock).initStore();
            queueRepository = Server.initQueuesRepository(config, (Path)dataPath, this.h2Builder);
            LOG.trace("Configuring H2 subscriptions repository");
            subscriptionsRepository = this.h2Builder.subscriptionsRepository();
            retainedRepository = this.h2Builder.retainedRepository();
            sessionsRepository = this.h2Builder.sessionsRepository();
        } else {
            LOG.trace("Configuring in-memory subscriptions store");
            subscriptionsRepository = new MemorySubscriptionsRepository();
            queueRepository = new MemoryQueueRepository();
            retainedRepository = new MemoryRetainedRepository();
            sessionsRepository = new MemorySessionsRepository();
        }
        CTrieSubscriptionDirectory subscriptions = new CTrieSubscriptionDirectory();
        subscriptions.init(subscriptionsRepository);
        Authorizator authorizator = new Authorizator(authorizatorPolicy);
        int globalSessionExpiry = config.getProperty("persistent_client_expiration") != null ? (int)config.durationProp("persistent_client_expiration").toMillis() / 1000 : Integer.MAX_VALUE;
        int sessionQueueSize = config.intProp("session_queue_size", 1024);
        SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(this.interceptor, sessionQueueSize);
        this.sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator, this.scheduler, clock, globalSessionExpiry, loopsGroup);
        this.dispatcher = new PostOffice(subscriptions, retainedRepository, this.sessions, this.interceptor, authorizator, loopsGroup);
        BrokerConfiguration brokerConfig = new BrokerConfiguration(config);
        MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, this.sessions, this.dispatcher);
        NewNettyMQTTHandler mqttHandler = new NewNettyMQTTHandler(connectionFactory);
        this.acceptor = new NewNettyAcceptor();
        this.acceptor.initialize(mqttHandler, config, sslCtxCreator, brokerConfig);
        long startTime = System.currentTimeMillis() - start;
        LOG.info("Moquette integration has been started successfully in {} ms", (Object)startTime);
        if (config.boolProp("telemetry_enabled", true)) {
            this.collectAndSendTelemetryDataAsynch(config);
        }
        this.initialized = true;
    }

    private static IQueueRepository initQueuesRepository(IConfig config, Path dataPath, H2Builder h2Builder) throws IOException {
        IQueueRepository queueRepository;
        String queueType = config.getProperty("persistent_queue_type");
        if ("h2".equalsIgnoreCase(queueType)) {
            LOG.info("Configuring H2 queue store");
            queueRepository = h2Builder.queueRepository();
        } else if ("segmented".equalsIgnoreCase(queueType)) {
            LOG.info("Configuring segmented queue store to {}", (Object)dataPath);
            int pageSize = config.intProp("queue_page_size", 0x4000000);
            int segmentSize = config.intProp("queue_segment_size", 0x400000);
            try {
                queueRepository = new SegmentQueueRepository(dataPath, pageSize, segmentSize);
            }
            catch (QueueException e) {
                throw new IOException("Problem in configuring persistent queue on path " + dataPath, e);
            }
        } else {
            String errMsg = String.format("Invalid property for %s found [%s] while only h2 or segmented are admitted", "persistent_queue_type", queueType);
            throw new RuntimeException(errMsg);
        }
        return queueRepository;
    }

    private void collectAndSendTelemetryDataAsynch(IConfig config) {
        Thread telCollector = new Thread(() -> this.collectAndSendTelemetryData(config));
        telCollector.start();
    }

    private void collectAndSendTelemetryData(IConfig config) {
        block2: {
            String uuid = this.checkOrCreateUUID(config);
            String telemetryDoc = this.collectTelemetryData(uuid);
            try {
                this.sendTelemetryData(telemetryDoc);
            }
            catch (IOException e) {
                LOG.info("Can't reach the telemetry collector");
                if (!LOG.isDebugEnabled()) break block2;
                LOG.debug("Original exception", (Throwable)e);
            }
        }
    }

    private String checkOrCreateUUID(IConfig config) {
        String storagePath = config.getProperty("data_path", "");
        Path uuidFilePath = Paths.get(storagePath, ".moquette_uuid");
        if (Files.exists(uuidFilePath, new LinkOption[0])) {
            try {
                return new String(Files.readAllBytes(uuidFilePath), StandardCharsets.UTF_8);
            }
            catch (IOException e) {
                LOG.error("Problem accessing file path: {}", (Object)uuidFilePath, (Object)e);
            }
        }
        UUID uuid = UUID.randomUUID();
        try {
            FileWriter f = new FileWriter(uuidFilePath.toFile(), false);
            f.write(uuid.toString());
            f.close();
        }
        catch (IOException e) {
            LOG.error("Problem writing new UUID to file path: {}", (Object)uuidFilePath, (Object)e);
        }
        return uuid.toString();
    }

    private String collectTelemetryData(String uuid) {
        String remoteIp = "uncollected";
        String os = System.getProperty("os.name");
        String cpuArch = System.getProperty("os.arch");
        String jvmVersion = System.getProperty("java.specification.version");
        String jvmVendor = System.getProperty("java.vendor");
        long maxMemory = Runtime.getRuntime().maxMemory();
        String maxHeap = maxMemory == Long.MAX_VALUE ? "undefined" : Long.toString(maxMemory);
        return String.format("{\"os\": \"%s\", \"cpu_arch\": \"%s\", \"jvm_version\": \"%s\", \"jvm_vendor\": \"%s\", \"broker_version\": \"%s\", \"standalone\": %s,\"max_heap\": \"%s\", \"remote_ip\": \"%s\", \"uuid\": \"%s\"}", os, cpuArch, jvmVersion, jvmVendor, MOQUETTE_VERSION, this.standalone, maxHeap, "uncollected", uuid);
    }

    private String retrievePublicIP() {
        try {
            URL url = new URL("http://whatismyip.akamai.com");
            HttpURLConnection con = (HttpURLConnection)url.openConnection();
            int status = con.getResponseCode();
            if (status != 200) {
                LOG.debug("What's my IP service replied with {}", (Object)status);
                return "";
            }
            BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
            return in.readLine();
        }
        catch (Exception e) {
            LOG.debug("Can't connect to what's my IP service");
            return "";
        }
    }

    private void sendTelemetryData(String telemetryDoc) throws IOException {
        String inputLine;
        URL url = new URL("https://telemetry.moquette.io/api/v1/notify");
        HttpURLConnection con = (HttpURLConnection)url.openConnection();
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type", "application/json");
        con.setRequestProperty("Accept", "application/json");
        con.setInstanceFollowRedirects(true);
        con.setDoOutput(true);
        byte[] input = telemetryDoc.getBytes("utf-8");
        try (OutputStream os = con.getOutputStream();){
            os.write(input, 0, input.length);
        }
        int status = con.getResponseCode();
        LOG.trace("Response code is {}", (Object)status);
        boolean redirect = false;
        if (status != 200 && (status == 302 || status == 301 || status == 303)) {
            redirect = true;
        }
        LOG.trace("Response Code: {} ", (Object)status);
        if (redirect) {
            String newUrl = con.getHeaderField("Location");
            con = (HttpURLConnection)new URL(newUrl).openConnection();
            con.addRequestProperty("Accept-Language", "en-US,en;q=0.8");
            con.addRequestProperty("User-Agent", "Mozilla");
            con.addRequestProperty("Referer", "google.com");
            con.setRequestMethod("POST");
            con.setDoOutput(true);
            try (OutputStream os = con.getOutputStream();){
                os.write(input, 0, input.length);
            }
            LOG.trace("Redirect to URL: {}", (Object)newUrl);
        }
        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        StringBuffer content = new StringBuffer();
        while ((inputLine = in.readLine()) != null) {
            content.append(inputLine);
        }
        in.close();
        LOG.trace("Content: {}", (Object)content);
        con.disconnect();
    }

    private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy authorizatorPolicy, IConfig props) {
        LOG.debug("Configuring MQTT authorizator policy");
        String authorizatorClassName = props.getProperty("authorizator_class", "");
        if (authorizatorPolicy == null && !authorizatorClassName.isEmpty()) {
            authorizatorPolicy = this.loadClass(authorizatorClassName, IAuthorizatorPolicy.class, IConfig.class, props);
        }
        if (authorizatorPolicy == null) {
            String aclFilePath = props.getProperty("acl_file", "");
            if (aclFilePath != null && !aclFilePath.isEmpty()) {
                authorizatorPolicy = new DenyAllAuthorizatorPolicy();
                try {
                    LOG.info("Parsing ACL file. Path = {}", (Object)aclFilePath);
                    IResourceLoader resourceLoader = props.getResourceLoader();
                    authorizatorPolicy = ACLFileParser.parse(resourceLoader.loadResource(aclFilePath));
                }
                catch (ParseException pex) {
                    LOG.error("Unable to parse ACL file. path = {}", (Object)aclFilePath, (Object)pex);
                }
            } else {
                authorizatorPolicy = new PermitAllAuthorizatorPolicy();
            }
            LOG.info("Authorizator policy {} instance will be used", (Object)authorizatorPolicy.getClass().getName());
        }
        return authorizatorPolicy;
    }

    private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, IConfig props) {
        LOG.debug("Configuring MQTT authenticator");
        String authenticatorClassName = props.getProperty("authenticator_class", "");
        if (authenticator == null && !authenticatorClassName.isEmpty()) {
            authenticator = this.loadClass(authenticatorClassName, IAuthenticator.class, IConfig.class, props);
        }
        IResourceLoader resourceLoader = props.getResourceLoader();
        if (authenticator == null) {
            String passwdPath = props.getProperty("password_file", "");
            authenticator = passwdPath.isEmpty() ? new AcceptAllAuthenticator() : new ResourceAuthenticator(resourceLoader, passwdPath);
            LOG.info("An {} authenticator instance will be used", (Object)authenticator.getClass().getName());
        }
        return authenticator;
    }

    private void initInterceptors(IConfig props, List<? extends InterceptHandler> embeddedObservers) {
        InterceptHandler handler;
        LOG.info("Configuring message interceptors...");
        ArrayList<InterceptHandler> observers = new ArrayList<InterceptHandler>(embeddedObservers);
        String interceptorClassName = props.getProperty("intercept.handler");
        if (interceptorClassName != null && !interceptorClassName.isEmpty() && (handler = this.loadClass(interceptorClassName, InterceptHandler.class, Server.class, this)) != null) {
            observers.add(handler);
        }
        this.interceptor = new BrokerInterceptor(props, observers);
    }

    private <T, U> T loadClass(String className, Class<T> intrface, Class<U> constructorArgClass, U props) {
        T instance = null;
        try {
            LOG.info("Invoking constructor with {} argument. ClassName={}, interfaceName={}", new Object[]{constructorArgClass.getName(), className, intrface.getName()});
            instance = this.getClass().getClassLoader().loadClass(className).asSubclass(intrface).getConstructor(constructorArgClass).newInstance(props);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException ex) {
            LOG.warn("Unable to invoke constructor with {} argument. ClassName={}, interfaceName={}, cause={}, errorMessage={}", new Object[]{constructorArgClass.getName(), className, intrface.getName(), ex.getCause(), ex.getMessage()});
            return null;
        }
        catch (NoSuchMethodException | InvocationTargetException e) {
            try {
                LOG.info("Invoking default constructor. ClassName={}, interfaceName={}", (Object)className, (Object)intrface.getName());
                instance = this.getClass().getClassLoader().loadClass(className).asSubclass(intrface).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException ex) {
                LOG.error("Unable to invoke default constructor. ClassName={}, interfaceName={}, cause={}, errorMessage={}", new Object[]{className, intrface.getName(), ex.getCause(), ex.getMessage()});
                return null;
            }
        }
        return instance;
    }

    public RoutingResults internalPublish(MqttPublishMessage msg, String clientId) {
        int messageID = msg.variableHeader().packetId();
        if (!this.initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId: {}, messageId: {}", (Object)clientId, (Object)messageID);
            throw new IllegalStateException("Can't publish on a integration is not yet started");
        }
        LOG.trace("Internal publishing message CId: {}, messageId: {}", (Object)clientId, (Object)messageID);
        RoutingResults routingResults = this.dispatcher.internalPublish(msg);
        msg.payload().release();
        return routingResults;
    }

    public void stopServer() {
        LOG.info("Unbinding integration from the configured ports");
        if (this.acceptor == null) {
            LOG.error("Closing a badly started server, exit immediately");
            return;
        }
        this.acceptor.close();
        LOG.trace("Stopping MQTT protocol processor");
        this.initialized = false;
        this.scheduler.shutdownNow();
        this.sessions.close();
        if (this.h2Builder != null) {
            LOG.trace("Shutting down H2 persistence {}");
            this.h2Builder.closeStore();
        }
        this.interceptor.stop();
        this.dispatcher.terminate();
        LOG.info("Moquette integration has been stopped.");
    }

    public int getPort() {
        return this.acceptor.getPort();
    }

    public int getSslPort() {
        return this.acceptor.getSslPort();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be added. InterceptorId={}", (Object)interceptHandler.getID());
            throw new IllegalStateException("Can't register interceptors on a integration that is not yet started");
        }
        LOG.info("Adding MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be removed. InterceptorId={}", (Object)interceptHandler.getID());
            throw new IllegalStateException("Can't deregister interceptors from a integration that is not yet started");
        }
        LOG.info("Removing MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.interceptor.removeInterceptHandler(interceptHandler);
    }

    public Collection<ClientDescriptor> listConnectedClients() {
        if (!this.initialized) {
            LOG.error("Moquette is not started, MQTT clients listing unavailable");
            throw new IllegalStateException("Can't get clients list from a Server that is not yet started");
        }
        return this.sessions.listConnectedClients();
    }

    public boolean disconnectClient(String clientId) {
        return this.sessions.dropSession(clientId, false);
    }

    public boolean disconnectAndPurgeClientState(String clientId) {
        return this.sessions.dropSession(clientId, true);
    }

    public FluentConfig withConfig() {
        return new FluentConfig(this);
    }
}

