/*
 * Decompiled with CFR 0.152.
 */
package storm.example;

import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.mapper.TupleToKafkaMapper;

public class AggregationBolt<K, V>
implements IRichBolt {
    protected OutputCollector collector;
    protected static final Logger log = Logger.getLogger(AggregationBolt.class);
    private long periodInSecs;
    private TupleToKafkaMapper<K, V> mapper;
    private List<Tuple> batch;

    public AggregationBolt(long periodInSecs) {
        this.periodInSecs = periodInSecs;
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        if (this.mapper == null) {
            this.mapper = new FieldNameBasedTupleToKafkaMapper();
        }
        this.batch = new CopyOnWriteArrayList<Tuple>();
    }

    public void execute(Tuple tuple) {
        if (AggregationBolt.isTickTuple(tuple)) {
            for (Tuple t : this.batch) {
                Object value;
                Object message = t.contains("message") ? ((value = t.getValueByField("message")) instanceof String ? ((String)value).getBytes() : (Object)value) : "".getBytes();
                this.collector.emit(t, (List)new Values(new Object[]{t.contains("key") ? t.getValueByField("key") : "", message, t.contains("topic") ? t.getValueByField("topic") : "", t.contains("attempt") ? t.getValueByField("attempt") : ""}));
                this.collector.ack(t);
            }
            this.batch.clear();
        } else {
            this.batch.add(tuple);
        }
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(new String[]{"key", "message", "topic", "attempt"}));
    }

    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put((Object)"topology.tick.tuple.freq.secs", (Object)this.periodInSecs);
        return conf;
    }

    private static boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals("__system") && tuple.getSourceStreamId().equals("__tick");
    }
}

