/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.datastream.impl.extension.eventtime.functions;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.common.watermark.WatermarkManager;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.RelativeClock;

public class ExtractEventTimeProcessFunction<IN>
implements OneInputStreamProcessFunction<IN, IN>,
ProcessingTimeService.ProcessingTimeCallback {
    private final EventTimeWatermarkStrategy<IN> watermarkStrategy;
    private long currentMaxEventTime = Long.MIN_VALUE;
    private long lastEmittedEventTime = Long.MIN_VALUE;
    private long periodicTimerInterval = 0L;
    private boolean enableIdleStatus;
    private WatermarksWithIdleness.IdlenessTimer idlenessTimer;
    private boolean isIdleNow = false;
    private final long maxOutOfOrderTimeInMs;
    private ProcessingTimeService processingTimeService;
    private WatermarkManager watermarkManager;

    public ExtractEventTimeProcessFunction(EventTimeWatermarkStrategy<IN> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        if (watermarkStrategy.getIdleTimeout().toMillis() > 0L) {
            this.enableIdleStatus = true;
        }
        this.maxOutOfOrderTimeInMs = watermarkStrategy.getMaxOutOfOrderTime().toMillis();
    }

    public void initEventTimeExtension(ExecutionConfig config, WatermarkManager watermarkManager, ProcessingTimeService processingTimeService) {
        this.processingTimeService = processingTimeService;
        this.watermarkManager = watermarkManager;
        if (this.enableIdleStatus) {
            this.idlenessTimer = new WatermarksWithIdleness.IdlenessTimer((RelativeClock)processingTimeService.getClock(), this.watermarkStrategy.getIdleTimeout());
        }
        boolean needRegisterTimer = this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC || this.enableIdleStatus;
        this.periodicTimerInterval = config.getAutoWatermarkInterval();
        if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC && !this.watermarkStrategy.getPeriodicWatermarkInterval().isZero()) {
            this.periodicTimerInterval = this.watermarkStrategy.getPeriodicWatermarkInterval().toMillis();
        }
        Preconditions.checkState((this.periodicTimerInterval > 0L ? 1 : 0) != 0, (Object)("Watermark interval " + this.periodicTimerInterval + " should large to 0."));
        if (needRegisterTimer) {
            processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + this.periodicTimerInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }

    public Set<? extends WatermarkDeclaration> declareWatermarks() {
        HashSet<Object> watermarkDeclarations = new HashSet<Object>();
        watermarkDeclarations.add(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION);
        if (this.enableIdleStatus) {
            watermarkDeclarations.add(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION);
        }
        return watermarkDeclarations;
    }

    public void processRecord(IN record, Collector<IN> output, PartitionedContext<IN> ctx) throws Exception {
        if (this.enableIdleStatus) {
            if (this.isIdleNow) {
                this.watermarkManager.emitWatermark((Watermark)EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false));
                this.isIdleNow = false;
            }
            this.idlenessTimer.activity();
        }
        long extractedEventTime = this.watermarkStrategy.getEventTimeExtractor().extractTimestamp(record);
        this.currentMaxEventTime = Math.max(this.currentMaxEventTime, extractedEventTime);
        output.collectAndOverwriteTimestamp(record, extractedEventTime);
        if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT) {
            this.tryEmitEventTimeWatermark(ctx.getNonPartitionedContext().getWatermarkManager());
        }
    }

    public void onProcessingTime(long time) throws IOException, InterruptedException, Exception {
        if (this.enableIdleStatus && this.idlenessTimer.checkIfIdle()) {
            if (!this.isIdleNow) {
                this.watermarkManager.emitWatermark((Watermark)EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true));
                this.isIdleNow = true;
            }
        } else if (this.watermarkStrategy.getGenerateMode() == EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC) {
            this.tryEmitEventTimeWatermark(this.watermarkManager);
        }
        this.processingTimeService.registerTimer(time + this.periodicTimerInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
    }

    private void tryEmitEventTimeWatermark(WatermarkManager watermarkManager) {
        if (this.currentMaxEventTime == Long.MIN_VALUE) {
            return;
        }
        long needEmittedEventTime = this.currentMaxEventTime - this.maxOutOfOrderTimeInMs;
        if (needEmittedEventTime > this.lastEmittedEventTime) {
            watermarkManager.emitWatermark((Watermark)EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(needEmittedEventTime));
            this.lastEmittedEventTime = needEmittedEventTime;
        }
    }
}

