package org.apache.storm.eventhubs.samples;

import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import org.apache.storm.eventhubs.spout.EventHubSpout;
import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;

/* loaded from: input_file:org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.class */
public class TransactionalTridentEventCount extends EventCount {

    /* loaded from: input_file:org/apache/storm/eventhubs/samples/TransactionalTridentEventCount$LoggingFilter.class */
    public static class LoggingFilter extends BaseFilter {
        private static final long serialVersionUID = 1;
        private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class);
        private final String prefix;
        private final long logIntervalMs;
        private long lastTime = System.nanoTime();

        public LoggingFilter(String str, int i) {
            this.prefix = str;
            this.logIntervalMs = i;
        }

        public boolean isKeep(TridentTuple tridentTuple) {
            long nanoTime = System.nanoTime();
            if (this.logIntervalMs >= (nanoTime - this.lastTime) / 1000000) {
                return false;
            }
            logger.info(this.prefix + tridentTuple.toString());
            this.lastTime = nanoTime;
            return false;
        }
    }

    @Override // org.apache.storm.eventhubs.samples.EventCount
    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
        TridentTopology tridentTopology = new TridentTopology();
        tridentTopology.newStream("stream-" + this.spoutConfig.getTopologyName(), new TransactionalTridentEventHubSpout(this.spoutConfig)).parallelismHint(this.spoutConfig.getPartitionCount()).aggregate(new Count(), new Fields(new String[]{"partial-count"})).persistentAggregate(new MemoryMapState.Factory(), new Fields(new String[]{"partial-count"}), new Sum(), new Fields(new String[]{"count"})).newValuesStream().each(new Fields(new String[]{"count"}), new LoggingFilter("got count: ", 10000));
        return tridentTopology.build();
    }

    public static void main(String[] strArr) throws Exception {
        new TransactionalTridentEventCount().runScenario(strArr);
    }
}
