/*
 * Decompiled with CFR 0.152.
 */
package storm.example;

import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import storm.example.AggregationBolt;
import storm.example.AverageCountBolt;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.KeyValueScheme;
import storm.kafka.KeyValueSchemeAsMultiScheme;
import storm.kafka.SpoutConfig;
import storm.kafka.StringKeyValueScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.mapper.TupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import storm.kafka.bolt.selector.KafkaTopicSelector;

public class DemoTopology {
    public static final String KEY_FIELD = "key";
    public static final String MESSAGE_FIELD = "message";
    public static final String TOPIC_FIELD = "topic";
    public static final String ATTEMP_FIELD = "attempt";

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IOException {
        TopologyBuilder builder = new TopologyBuilder();
        if (args.length < 5) {
            System.err.println("usage: command <zookeeperHost:port> <bootstrap server host:port> <use kafka 09 API: true/false> <topic to read from> <topic to write count> <topic to send source data>");
            return;
        }
        Properties props = new Properties();
        props.load(DemoTopology.class.getResourceAsStream("/kafka.properties"));
        props.put("topology.message.timeout.secs", (Object)60);
        props.put("bootstrap.servers", args[1]);
        String topicName = args[3];
        String topicNameForAverage = args[4];
        String topicThatReceivesOriginalData = args[5];
        props.put("group.id", topicName);
        String zkConnString = args[0];
        ZkHosts hosts = new ZkHosts(zkConnString);
        SpoutConfig spoutConfig = new SpoutConfig((BrokerHosts)hosts, topicName, "true".equals(args[2]) ? topicName : "/" + topicName, UUID.randomUUID().toString());
        spoutConfig.scheme = new KeyValueSchemeAsMultiScheme((KeyValueScheme)new KafkaBoltKeyValueScheme());
        if ("true".equals(args[2])) {
            spoutConfig.kafkaAPIv = "0.9";
        }
        KafkaSpout spout = new KafkaSpout(spoutConfig);
        builder.setSpout("spout", (IRichSpout)spout, (Number)1);
        long tuplesCountPeriodInSecs = 5L;
        AggregationBolt averageBolt = new AggregationBolt(tuplesCountPeriodInSecs);
        builder.setBolt("aggregationBolt", averageBolt, (Number)1).shuffleGrouping("spout");
        AverageCountBolt averageCountBolt = new AverageCountBolt(tuplesCountPeriodInSecs);
        builder.setBolt("averageCounter", averageCountBolt, (Number)1).globalGrouping("spout");
        KafkaBolt dataToKafkaBolt = new KafkaBolt().withTopicSelector((KafkaTopicSelector)new DefaultTopicSelector(topicThatReceivesOriginalData)).withTupleToKafkaMapper((TupleToKafkaMapper)new FieldNameBasedTupleToKafkaMapper()).withProducerProperties(props);
        builder.setBolt("aggregationToKafka", (IRichBolt)dataToKafkaBolt, (Number)1).fieldsGrouping("aggregationBolt", new Fields(new String[]{KEY_FIELD, MESSAGE_FIELD, TOPIC_FIELD, ATTEMP_FIELD}));
        KafkaBolt averageCountToKafkaBolt = new KafkaBolt().withTopicSelector((KafkaTopicSelector)new DefaultTopicSelector(topicNameForAverage)).withTupleToKafkaMapper((TupleToKafkaMapper)new FieldNameBasedTupleToKafkaMapper()).withProducerProperties(props);
        builder.setBolt("averageCountToKafka", (IRichBolt)averageCountToKafkaBolt, (Number)1).globalGrouping("averageCounter");
        StormSubmitter.submitTopology((String)"streamsTest", (Map)props, (StormTopology)builder.createTopology());
    }

    public static class KafkaBoltKeyValueScheme
    extends StringKeyValueScheme {
        public Fields getOutputFields() {
            return new Fields(new String[]{DemoTopology.MESSAGE_FIELD});
        }
    }
}

