package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:storm/starter/ReachTopology.class */
public class ReachTopology {
    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() { // from class: storm.starter.ReachTopology.1
        {
            put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
            put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
            put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
        }
    };
    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() { // from class: storm.starter.ReachTopology.2
        {
            put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
            put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
            put("tim", Arrays.asList("alex"));
            put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
            put("adam", Arrays.asList("david", "carissa"));
            put("mike", Arrays.asList("john", "bob"));
            put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
        }
    };

    /* loaded from: input_file:storm/starter/ReachTopology$CountAggregator.class */
    public static class CountAggregator extends BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        int _count = 0;

        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, Object obj) {
            this._collector = batchOutputCollector;
            this._id = obj;
        }

        public void execute(Tuple tuple) {
            this._count += tuple.getInteger(1).intValue();
        }

        public void finishBatch() {
            this._collector.emit(new Values(new Object[]{this._id, Integer.valueOf(this._count)}));
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"id", "reach"}));
        }
    }

    /* loaded from: input_file:storm/starter/ReachTopology$GetFollowers.class */
    public static class GetFollowers extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            Object value = tuple.getValue(0);
            List<String> list = ReachTopology.FOLLOWERS_DB.get(tuple.getString(1));
            if (list != null) {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    basicOutputCollector.emit(new Values(new Object[]{value, it.next()}));
                }
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"id", "follower"}));
        }
    }

    /* loaded from: input_file:storm/starter/ReachTopology$GetTweeters.class */
    public static class GetTweeters extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            Object value = tuple.getValue(0);
            List<String> list = ReachTopology.TWEETERS_DB.get(tuple.getString(1));
            if (list != null) {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    basicOutputCollector.emit(new Values(new Object[]{value, it.next()}));
                }
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"id", "tweeter"}));
        }
    }

    /* loaded from: input_file:storm/starter/ReachTopology$PartialUniquer.class */
    public static class PartialUniquer extends BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        Set<String> _followers = new HashSet();

        public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, Object obj) {
            this._collector = batchOutputCollector;
            this._id = obj;
        }

        public void execute(Tuple tuple) {
            this._followers.add(tuple.getString(1));
        }

        public void finishBatch() {
            this._collector.emit(new Values(new Object[]{this._id, Integer.valueOf(this._followers.size())}));
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"id", "partial-count"}));
        }
    }

    public static LinearDRPCTopologyBuilder construct() {
        LinearDRPCTopologyBuilder linearDRPCTopologyBuilder = new LinearDRPCTopologyBuilder("reach");
        linearDRPCTopologyBuilder.addBolt(new GetTweeters(), 4);
        linearDRPCTopologyBuilder.addBolt(new GetFollowers(), 12).shuffleGrouping();
        linearDRPCTopologyBuilder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields(new String[]{"id", "follower"}));
        linearDRPCTopologyBuilder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields(new String[]{"id"}));
        return linearDRPCTopologyBuilder;
    }

    public static void main(String[] strArr) throws Exception {
        LinearDRPCTopologyBuilder construct = construct();
        Config config = new Config();
        if (strArr != null && strArr.length != 0) {
            config.setNumWorkers(6);
            StormSubmitter.submitTopologyWithProgressBar(strArr[0], config, construct.createRemoteTopology());
            return;
        }
        config.setMaxTaskParallelism(3);
        LocalDRPC localDRPC = new LocalDRPC();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("reach-drpc", config, construct.createLocalTopology(localDRPC));
        for (String str : new String[]{"foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com"}) {
            System.out.println("Reach of " + str + ": " + localDRPC.execute("reach", str));
        }
        localCluster.shutdown();
        localDRPC.shutdown();
    }
}
