package org.apache.storm.eventhubs.samples;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.metric.LoggingMetricsConsumer;
import backtype.storm.topology.TopologyBuilder;
import java.io.FileReader;
import java.util.Properties;
import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
import org.apache.storm.eventhubs.samples.bolt.PartialCountBolt;
import org.apache.storm.eventhubs.spout.EventHubSpout;
import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;

/* loaded from: input_file:org/apache/storm/eventhubs/samples/EventCount.class */
public class EventCount {
    protected EventHubSpoutConfig spoutConfig;
    protected int numWorkers;

    protected void readEHConfig(String[] strArr) throws Exception {
        Properties properties = new Properties();
        if (strArr.length > 1) {
            properties.load(new FileReader(strArr[1]));
        } else {
            properties.load(EventCount.class.getClassLoader().getResourceAsStream("Config.properties"));
        }
        String property = properties.getProperty("eventhubspout.username");
        String property2 = properties.getProperty("eventhubspout.password");
        String property3 = properties.getProperty("eventhubspout.namespace");
        String property4 = properties.getProperty("eventhubspout.entitypath");
        String property5 = properties.getProperty("eventhubspout.targetfqnaddress");
        String property6 = properties.getProperty("zookeeper.connectionstring");
        int parseInt = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
        int parseInt2 = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
        int parseInt3 = Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
        String property7 = properties.getProperty("eventhubspout.max.pending.messages.per.partition");
        if (property7 == null) {
            property7 = "1024";
        }
        int parseInt4 = Integer.parseInt(property7);
        String property8 = properties.getProperty("eventhub.receiver.filter.timediff");
        if (property8 == null) {
            property8 = "0";
        }
        long j = 0;
        if (Integer.parseInt(property8) != 0) {
            j = System.currentTimeMillis() - (r0 * 1000);
        }
        String property9 = properties.getProperty("eventhubspout.consumer.group.name");
        System.out.println("Eventhub spout config: ");
        System.out.println("  partition count: " + parseInt);
        System.out.println("  checkpoint interval: " + parseInt2);
        System.out.println("  receiver credits: " + parseInt3);
        this.spoutConfig = new EventHubSpoutConfig(property, property2, property3, property4, parseInt, property6, parseInt2, parseInt3, parseInt4, j);
        if (property5 != null) {
            this.spoutConfig.setTargetAddress(property5);
        }
        this.spoutConfig.setConsumerGroupName(property9);
        this.numWorkers = this.spoutConfig.getPartitionCount();
        if (strArr.length > 0) {
            this.spoutConfig.setTopologyName(strArr[0]);
        }
    }

    protected EventHubSpout createEventHubSpout() {
        return new EventHubSpout(this.spoutConfig);
    }

    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, Integer.valueOf(this.spoutConfig.getPartitionCount())).setNumTasks(Integer.valueOf(this.spoutConfig.getPartitionCount()));
        topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), Integer.valueOf(this.spoutConfig.getPartitionCount())).localOrShuffleGrouping("EventHubsSpout").setNumTasks(Integer.valueOf(this.spoutConfig.getPartitionCount()));
        topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1).globalGrouping("PartialCountBolt").setNumTasks(1);
        return topologyBuilder.createTopology();
    }

    protected void submitTopology(String[] strArr, StormTopology stormTopology) throws Exception {
        Config config = new Config();
        config.setDebug(false);
        config.registerMetricsConsumer(LoggingMetricsConsumer.class, 1L);
        if (strArr != null && strArr.length > 0) {
            config.setNumWorkers(this.numWorkers);
            StormSubmitter.submitTopology(strArr[0], config, stormTopology);
            return;
        }
        config.setMaxTaskParallelism(2);
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("test", config, stormTopology);
        Thread.sleep(5000000L);
        localCluster.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runScenario(String[] strArr) throws Exception {
        readEHConfig(strArr);
        submitTopology(strArr, buildTopology(createEventHubSpout()));
    }

    public static void main(String[] strArr) throws Exception {
        new EventCount().runScenario(strArr);
    }
}
