package org.apache.storm.hdfs.trident;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.storm.hdfs.trident.HdfsState;
import org.apache.storm.hdfs.trident.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
import storm.trident.Stream;
import storm.trident.TridentTopology;

/* loaded from: input_file:org/apache/storm/hdfs/trident/TridentFileTopology.class */
public class TridentFileTopology {
    public static StormTopology buildTopology(String str) {
        FixedBatchSpout fixedBatchSpout = new FixedBatchSpout(new Fields(new String[]{"sentence", "key"}), 1000, new Values(new Object[]{"the cow jumped over the moon", 1L}), new Values(new Object[]{"the man went to the store and bought some candy", 2L}), new Values(new Object[]{"four score and seven years ago", 3L}), new Values(new Object[]{"how many apples can you eat", 4L}), new Values(new Object[]{"to be or not to be the person", 5L}));
        fixedBatchSpout.setCycle(true);
        TridentTopology tridentTopology = new TridentTopology();
        Stream newStream = tridentTopology.newStream("spout1", fixedBatchSpout);
        Fields fields = new Fields(new String[]{"sentence", "key"});
        DefaultFileNameFormat withExtension = new DefaultFileNameFormat().withPath("/tmp/trident").withPrefix("trident").withExtension(".txt");
        newStream.partitionPersist(new HdfsStateFactory().withOptions(new HdfsState.HdfsFileOptions().withFileNameFormat(withExtension).withRecordFormat(new DelimitedRecordFormat().withFields(fields)).withRotationPolicy(new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB)).withFsUrl(str)), fields, new HdfsUpdater(), new Fields(new String[0]));
        return tridentTopology.build();
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        config.setMaxSpoutPending(5);
        if (strArr.length == 1) {
            new LocalCluster().submitTopology("wordCounter", config, buildTopology(strArr[0]));
            Thread.sleep(120000L);
        } else if (strArr.length != 2) {
            System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]");
        } else {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(strArr[1], config, buildTopology(strArr[0]));
        }
    }
}
