/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.dsv2.eventtime;

import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListStateDeclaration;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.context.StateManager;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.ParameterTool;

public class CountNewsClicks {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        boolean fileOutput = params.has("output");
        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
        String inputFilePath = Objects.requireNonNull(CountNewsClicks.class.getClassLoader().getResource("datas/dsv2/eventtime/CountNewsClicksEvents.csv")).getPath();
        CsvReaderFormat csvFormat = CsvReaderFormat.forPojo(NewsEvent.class);
        FileSource fileSource = FileSource.forRecordStreamFormat((StreamFormat)csvFormat, (Path[])new Path[]{new Path(inputFilePath)}).build();
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream source = env.fromSource((org.apache.flink.api.connector.dsv2.Source)new WrappedSource((Source)fileSource), "news source");
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream clicksStream = source.process(EventTimeExtension.newWatermarkGeneratorBuilder((EventTimeExtractor & Serializable)event -> event.timestamp).periodicWatermark(Duration.ofMillis(200L)).withIdleness(Duration.ofSeconds(30L)).withMaxOutOfOrderTime(Duration.ofSeconds(10L)).buildAsProcessFunction()).keyBy((KeySelector & Serializable)event -> event.newsId).process(EventTimeExtension.wrapProcessFunction((OneInputEventTimeStreamProcessFunction)new CountNewsClicksProcessFunction()));
        if (fileOutput) {
            clicksStream.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(params.get("output")), (Encoder)new SimpleStringEncoder()).withRollingPolicy((RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes((long)1L)).withRolloverInterval(Duration.ofSeconds(10L)).build())).build())).withName("output");
        } else {
            clicksStream.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)new PrintSink())).withName("print-sink");
        }
        env.execute("CountNewsClicks");
    }

    public static class NewsEvent {
        public long newsId;
        public long timestamp;
        public NewsEventType type;
    }

    public static class CountNewsClicksProcessFunction
    implements OneInputEventTimeStreamProcessFunction<NewsEvent, NewsClicks> {
        private static final long ONE_HOUR_IN_MS = Duration.ofHours(1L).toMillis();
        private EventTimeManager eventTimeManager;
        private final ValueStateDeclaration<Long> publishTimeStateDeclaration = StateDeclarations.valueState((String)"publish_time", (TypeDescriptor)TypeDescriptors.LONG);
        private final ValueStateDeclaration<Long> clickCountStateDeclaration = StateDeclarations.valueState((String)"click_count", (TypeDescriptor)TypeDescriptors.LONG);
        private final ListStateDeclaration<Long> pendingClicksStateDeclaration = StateDeclarations.listState((String)"pending_clicks", (TypeDescriptor)TypeDescriptors.LONG);

        public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) {
            this.eventTimeManager = eventTimeManager;
        }

        public Set<StateDeclaration> usesStates() {
            return Set.of(this.publishTimeStateDeclaration, this.clickCountStateDeclaration, this.pendingClicksStateDeclaration);
        }

        public void processRecord(NewsEvent record, Collector<NewsClicks> output, PartitionedContext<NewsClicks> ctx) throws Exception {
            if (record.type == NewsEventType.PUBLISH) {
                long publishTime = record.timestamp;
                if (this.eventTimeManager.currentTime() > publishTime + ONE_HOUR_IN_MS) {
                    return;
                }
                ctx.getStateManager().getState(this.publishTimeStateDeclaration).update((Object)publishTime);
                Iterable pendingClicks = (Iterable)ctx.getStateManager().getState(this.pendingClicksStateDeclaration).get();
                if (pendingClicks != null) {
                    long addition = 0L;
                    for (Long clickTime : pendingClicks) {
                        if (clickTime > publishTime + ONE_HOUR_IN_MS) continue;
                        ++addition;
                    }
                    this.addClickCount(ctx.getStateManager(), addition);
                }
                this.eventTimeManager.registerTimer(publishTime + ONE_HOUR_IN_MS);
            } else {
                Long publishTime = (Long)ctx.getStateManager().getState(this.publishTimeStateDeclaration).value();
                if (publishTime == null) {
                    ctx.getStateManager().getState(this.pendingClicksStateDeclaration).add((Object)record.timestamp);
                    this.eventTimeManager.registerTimer(record.timestamp + ONE_HOUR_IN_MS);
                } else if (record.timestamp <= publishTime + ONE_HOUR_IN_MS) {
                    this.addClickCount(ctx.getStateManager(), 1L);
                }
            }
        }

        public void onEventTimer(long timestamp, Collector<NewsClicks> output, PartitionedContext<NewsClicks> ctx) throws Exception {
            Long publishTime = (Long)ctx.getStateManager().getState(this.publishTimeStateDeclaration).value();
            if (publishTime == null) {
                this.clearAllState(ctx.getStateManager());
                return;
            }
            long newsId = (Long)ctx.getStateManager().getCurrentKey();
            Long clickCount = (Long)ctx.getStateManager().getState(this.clickCountStateDeclaration).value();
            clickCount = clickCount == null ? 0L : clickCount;
            output.collect((Object)new NewsClicks(newsId, clickCount));
            this.clearAllState(ctx.getStateManager());
        }

        private void addClickCount(StateManager stateManager, long addition) throws Exception {
            ValueState clickCountState = stateManager.getState(this.clickCountStateDeclaration);
            Long previousCount = (Long)clickCountState.value();
            long newlyCount = previousCount == null ? addition : previousCount + addition;
            clickCountState.update((Object)newlyCount);
        }

        private void clearAllState(StateManager stateManager) throws Exception {
            stateManager.getState(this.publishTimeStateDeclaration).clear();
            stateManager.getState(this.clickCountStateDeclaration).clear();
            stateManager.getState(this.pendingClicksStateDeclaration).clear();
        }
    }

    public static class NewsClicks {
        public long newsId;
        public long clicks;

        public NewsClicks(long newsId, long clicks) {
            this.newsId = newsId;
            this.clicks = clicks;
        }

        public String toString() {
            return String.format("%d,%d", this.newsId, this.clicks);
        }
    }

    public static enum NewsEventType {
        PUBLISH,
        CLICK;

    }
}

