package storm.example;

import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import storm.kafka.KafkaSpout;
import storm.kafka.KeyValueSchemeAsMultiScheme;
import storm.kafka.SpoutConfig;
import storm.kafka.StringKeyValueScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;

/* loaded from: input_file:storm/example/DemoTopology.class */
public class DemoTopology {
    public static final String KEY_FIELD = "key";
    public static final String MESSAGE_FIELD = "message";
    public static final String TOPIC_FIELD = "topic";
    public static final String ATTEMP_FIELD = "attempt";

    /* loaded from: input_file:storm/example/DemoTopology$KafkaBoltKeyValueScheme.class */
    public static class KafkaBoltKeyValueScheme extends StringKeyValueScheme {
        @Override // storm.kafka.StringScheme
        public Fields getOutputFields() {
            return new Fields(new String[]{"message"});
        }
    }

    public static void main(String[] strArr) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IOException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        if (strArr.length < 5) {
            System.err.println("usage: command <zookeeperHost:port> <bootstrap server host:port> <use kafka 09 API: true/false> <topic to read from> <topic to write count> <topic to send source data>");
            return;
        }
        Properties properties = new Properties();
        properties.load(DemoTopology.class.getResourceAsStream("/kafka.properties"));
        properties.put("topology.message.timeout.secs", 60);
        properties.put("bootstrap.servers", strArr[1]);
        String str = strArr[3];
        String str2 = strArr[4];
        String str3 = strArr[5];
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, str);
        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(strArr[0]), str, ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT.equals(strArr[2]) ? str : ZKPaths.PATH_SEPARATOR + str, UUID.randomUUID().toString());
        spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new KafkaBoltKeyValueScheme());
        if (ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT.equals(strArr[2])) {
            spoutConfig.kafkaAPIv = "0.9";
        }
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig), 1);
        topologyBuilder.setBolt("aggregationBolt", new AggregationBolt(5L), 1).shuffleGrouping("spout");
        topologyBuilder.setBolt("averageCounter", new AverageCountBolt(5L), 1).globalGrouping("spout");
        topologyBuilder.setBolt("aggregationToKafka", new KafkaBolt().withTopicSelector(new DefaultTopicSelector(str3)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()).withProducerProperties(properties), 1).fieldsGrouping("aggregationBolt", new Fields(new String[]{"key", "message", "topic", ATTEMP_FIELD}));
        topologyBuilder.setBolt("averageCountToKafka", new KafkaBolt().withTopicSelector(new DefaultTopicSelector(str2)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()).withProducerProperties(properties), 1).globalGrouping("averageCounter");
        StormSubmitter.submitTopology("streamsTest", properties, topologyBuilder.createTopology());
    }
}
