/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.flux;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.utils.Utils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.storm.flux.FluxBuilder;
import org.apache.storm.flux.model.BoltDef;
import org.apache.storm.flux.model.ExecutionContext;
import org.apache.storm.flux.model.SpoutDef;
import org.apache.storm.flux.model.StreamDef;
import org.apache.storm.flux.model.TopologyDef;
import org.apache.storm.flux.parser.FluxParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Flux {
    private static final Logger LOG = LoggerFactory.getLogger(Flux.class);
    private static final Long DEFAULT_LOCAL_SLEEP_TIME = 60000L;
    private static final Long DEFAULT_ZK_PORT = 2181L;
    private static final String OPTION_LOCAL = "local";
    private static final String OPTION_REMOTE = "remote";
    private static final String OPTION_RESOURCE = "resource";
    private static final String OPTION_SLEEP = "sleep";
    private static final String OPTION_DRY_RUN = "dry-run";
    private static final String OPTION_NO_DETAIL = "no-detail";
    private static final String OPTION_NO_SPLASH = "no-splash";
    private static final String OPTION_INACTIVE = "inactive";
    private static final String OPTION_ZOOKEEPER = "zookeeper";
    private static final String OPTION_FILTER = "filter";
    private static final String OPTION_ENV_FILTER = "env-filter";

    public static void main(String[] args) throws Exception {
        Options options = new Options();
        options.addOption(Flux.option(0, "l", OPTION_LOCAL, "Run the topology in local mode."));
        options.addOption(Flux.option(0, "r", OPTION_REMOTE, "Deploy the topology to a remote cluster."));
        options.addOption(Flux.option(0, "R", OPTION_RESOURCE, "Treat the supplied path as a classpath resource instead of a file."));
        options.addOption(Flux.option(1, "s", OPTION_SLEEP, "ms", "When running locally, the amount of time to sleep (in ms.) before killing the topology and shutting down the local cluster."));
        options.addOption(Flux.option(0, "d", OPTION_DRY_RUN, "Do not run or deploy the topology. Just build, validate, and print information about the topology."));
        options.addOption(Flux.option(0, "q", OPTION_NO_DETAIL, "Suppress the printing of topology details."));
        options.addOption(Flux.option(0, "n", OPTION_NO_SPLASH, "Suppress the printing of the splash screen."));
        options.addOption(Flux.option(0, "i", OPTION_INACTIVE, "Deploy the topology, but do not activate it."));
        options.addOption(Flux.option(1, "z", OPTION_ZOOKEEPER, "host:port", "When running in local mode, use the ZooKeeper at the specified <host>:<port> instead of the in-process ZooKeeper. (requires Storm 0.9.3 or later)"));
        options.addOption(Flux.option(1, "f", OPTION_FILTER, "file", "Perform property substitution. Use the specified file as a source of properties, and replace keys identified with {$[property name]} with the value defined in the properties file."));
        options.addOption(Flux.option(0, "e", OPTION_ENV_FILTER, "Perform environment variable substitution. Replace keysidentified with `${ENV-[NAME]}` will be replaced with the corresponding `NAME` environment value"));
        BasicParser parser = new BasicParser();
        CommandLine cmd = parser.parse(options, args);
        if (cmd.getArgs().length != 1) {
            Flux.usage(options);
            System.exit(1);
        }
        Flux.runCli(cmd);
    }

    private static Option option(int argCount, String shortName, String longName, String description) {
        return Flux.option(argCount, shortName, longName, longName, description);
    }

    private static Option option(int argCount, String shortName, String longName, String argName, String description) {
        OptionBuilder.hasArgs(argCount);
        OptionBuilder.withArgName(argName);
        OptionBuilder.withLongOpt(longName);
        OptionBuilder.withDescription(description);
        Option option = OptionBuilder.create(shortName);
        return option;
    }

    private static void usage(Options options) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("storm jar <my_topology_uber_jar.jar> " + Flux.class.getName() + " [options] <topology-config.yaml>", options);
    }

    private static void runCli(CommandLine cmd) throws Exception {
        if (!cmd.hasOption(OPTION_NO_SPLASH)) {
            Flux.printSplash();
        }
        boolean dumpYaml = cmd.hasOption("dump-yaml");
        TopologyDef topologyDef = null;
        String filePath = (String)cmd.getArgList().get(0);
        String filterProps = null;
        if (cmd.hasOption(OPTION_FILTER)) {
            filterProps = cmd.getOptionValue(OPTION_FILTER);
        }
        boolean envFilter = cmd.hasOption(OPTION_ENV_FILTER);
        if (cmd.hasOption(OPTION_RESOURCE)) {
            Flux.printf("Parsing classpath resource: %s", filePath);
            topologyDef = FluxParser.parseResource(filePath, dumpYaml, true, filterProps, envFilter);
        } else {
            Flux.printf("Parsing file: %s", new File(filePath).getAbsolutePath());
            topologyDef = FluxParser.parseFile(filePath, dumpYaml, true, filterProps, envFilter);
        }
        String topologyName = topologyDef.getName();
        Config conf = FluxBuilder.buildConfig(topologyDef);
        ExecutionContext context = new ExecutionContext(topologyDef, conf);
        StormTopology topology = FluxBuilder.buildTopology(context);
        if (!cmd.hasOption(OPTION_NO_DETAIL)) {
            Flux.printTopologyInfo(context);
        }
        if (!cmd.hasOption(OPTION_DRY_RUN)) {
            if (cmd.hasOption(OPTION_REMOTE)) {
                LOG.info("Running remotely...");
                try {
                    SubmitOptions submitOptions = null;
                    if (cmd.hasOption(OPTION_INACTIVE)) {
                        LOG.info("Deploying topology in an INACTIVE state...");
                        submitOptions = new SubmitOptions(TopologyInitialStatus.INACTIVE);
                    } else {
                        LOG.info("Deploying topology in an ACTIVE state...");
                        submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);
                    }
                    StormSubmitter.submitTopology((String)topologyName, (Map)conf, (StormTopology)topology, (SubmitOptions)submitOptions, null);
                }
                catch (Exception e) {
                    LOG.warn("Unable to deploy topology to remote cluster.", e);
                }
            } else {
                LOG.info("Running in local mode...");
                String sleepStr = cmd.getOptionValue(OPTION_SLEEP);
                Long sleepTime = DEFAULT_LOCAL_SLEEP_TIME;
                if (sleepStr != null) {
                    sleepTime = Long.parseLong(sleepStr);
                }
                LOG.debug("Sleep time: {}", (Object)sleepTime);
                LocalCluster cluster = null;
                if (cmd.hasOption(OPTION_ZOOKEEPER)) {
                    String zkStr = cmd.getOptionValue(OPTION_ZOOKEEPER);
                    LOG.info("Using ZooKeeper at '{}' instead of in-process one.", (Object)zkStr);
                    long zkPort = DEFAULT_ZK_PORT;
                    String zkHost = null;
                    if (zkStr.contains(":")) {
                        String[] hostPort = zkStr.split(":");
                        zkHost = hostPort[0];
                        zkPort = hostPort.length > 1 ? Long.parseLong(hostPort[1]) : DEFAULT_ZK_PORT;
                    } else {
                        zkHost = zkStr;
                    }
                    try {
                        cluster = new LocalCluster(zkHost, Long.valueOf(zkPort));
                    }
                    catch (NoSuchMethodError e) {
                        LOG.error("The --zookeeper option can only be used with Apache Storm 0.9.3 and later.");
                        System.exit(1);
                    }
                } else {
                    cluster = new LocalCluster();
                }
                cluster.submitTopology(topologyName, (Map)conf, topology);
                Utils.sleep((long)sleepTime);
                cluster.killTopology(topologyName);
                cluster.shutdown();
            }
        }
    }

    static void printTopologyInfo(ExecutionContext ctx) {
        TopologyDef t = ctx.getTopologyDef();
        if (t.isDslTopology()) {
            Flux.print("---------- TOPOLOGY DETAILS ----------");
            Flux.printf("Topology Name: %s", t.getName());
            Flux.print("--------------- SPOUTS ---------------");
            for (SpoutDef s2 : t.getSpouts()) {
                Flux.printf("%s [%d] (%s)", s2.getId(), s2.getParallelism(), s2.getClassName());
            }
            Flux.print("---------------- BOLTS ---------------");
            for (BoltDef b : t.getBolts()) {
                Flux.printf("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName());
            }
            Flux.print("--------------- STREAMS ---------------");
            for (StreamDef sd : t.getStreams()) {
                Flux.printf("%s --%s--> %s", new Object[]{sd.getFrom(), sd.getGrouping().getType(), sd.getTo()});
            }
            Flux.print("--------------------------------------");
        }
    }

    private static void printf(String format2, Object ... args) {
        Flux.print(String.format(format2, args));
    }

    private static void print(String string2) {
        System.out.println(string2);
    }

    private static void printSplash() throws IOException {
        InputStream is = Flux.class.getResourceAsStream("/splash.txt");
        if (is != null) {
            InputStreamReader isr = new InputStreamReader(is, "UTF-8");
            BufferedReader br = new BufferedReader(isr);
            String line = null;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
        }
    }
}

