/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

@Deprecated
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
    private final LegacySourceFunctionThread sourceThread;
    private final Object lock;
    private volatile boolean externallyInducedCheckpoints;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private volatile FinishingReason finishingReason = FinishingReason.END_OF_DATA;

    public SourceStreamTask(Environment env) throws Exception {
        this(env, new Object());
    }

    private SourceStreamTask(Environment env, Object lock) throws Exception {
        super(env, null, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE, StreamTaskActionExecutor.synchronizedExecutor(lock));
        this.lock = Preconditions.checkNotNull((Object)lock);
        this.sourceThread = new LegacySourceFunctionThread();
        this.getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
    }

    @Override
    protected void init() {
        SourceFunction source = (SourceFunction)((StreamSource)this.mainOperator).getUserFunction();
        if (source instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger(){

                @Override
                public void triggerCheckpoint(long checkpointId) throws FlinkException {
                    CheckpointOptions checkpointOptions = CheckpointOptions.forConfig(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), SourceStreamTask.this.configuration.isExactlyOnceCheckpointMode(), SourceStreamTask.this.configuration.isUnalignedCheckpointsEnabled(), SourceStreamTask.this.configuration.getAlignedCheckpointTimeout().toMillis());
                    long timestamp = System.currentTimeMillis();
                    CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp, timestamp);
                    try {
                        SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();
                    }
                    catch (RuntimeException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new FlinkException(e.getMessage(), (Throwable)e);
                    }
                }
            };
            ((ExternallyInducedSource)source).setCheckpointTrigger(triggerHook);
        }
        this.getEnvironment().getMetricGroup().getIOMetricGroup().gauge("checkpointStartDelayNanos", this::getAsyncCheckpointStartDelayNanos);
        this.recordWriter.setMaxOverdraftBuffersPerGate(0);
    }

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        this.operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK);
    }

    @Override
    protected void cleanUpInternal() {
    }

    @Override
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        controller.suspendDefaultAction();
        this.sourceThread.setTaskDescription(this.getName());
        this.sourceThread.start();
        this.sourceThread.getCompletionFuture().whenComplete((ignore, sourceThreadThrowable) -> {
            if (sourceThreadThrowable != null) {
                this.mailboxProcessor.reportThrowable((Throwable)sourceThreadThrowable);
            } else {
                this.notifyEndOfData();
                this.mailboxProcessor.suspend();
            }
        });
    }

    @Override
    protected void cancelTask() {
        if (this.stopped.compareAndSet(false, true)) {
            this.cancelOperator();
        }
    }

    private void cancelOperator() {
        try {
            if (this.mainOperator != null) {
                ((StreamSource)this.mainOperator).cancel();
            }
        }
        finally {
            if (this.sourceThread.isAlive()) {
                this.interruptSourceThread();
            } else if (!this.sourceThread.getCompletionFuture().isDone()) {
                this.sourceThread.getCompletionFuture().complete(null);
            }
        }
    }

    @Override
    public void maybeInterruptOnCancel(Thread toInterrupt, @Nullable String taskName, @Nullable Long timeout) {
        super.maybeInterruptOnCancel(toInterrupt, taskName, timeout);
        this.interruptSourceThread();
    }

    private void interruptSourceThread() {
        if (this.operatorChain != null && this.operatorChain.isTaskDeployedAsFinished()) {
            return;
        }
        if (this.sourceThread.isAlive()) {
            this.sourceThread.interrupt();
        }
    }

    @Override
    protected CompletableFuture<Void> getCompletionFuture() {
        return this.sourceThread.getCompletionFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        if (!this.externallyInducedCheckpoints) {
            if (this.isSynchronousSavepoint(checkpointOptions.getCheckpointType())) {
                return this.triggerStopWithSavepointAsync(checkpointMetaData, checkpointOptions);
            }
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }
        if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
            throw new IllegalStateException("Using externally induced sources, we can not enforce taking a full checkpoint.If you are restoring from a snapshot in NO_CLAIM mode, please use CLAIM mode.");
        }
        Object object = this.lock;
        synchronized (object) {
            return CompletableFuture.completedFuture(this.isRunning());
        }
    }

    private boolean isSynchronousSavepoint(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType)snapshotType).isSynchronous();
    }

    private CompletableFuture<Boolean> triggerStopWithSavepointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        this.mainMailboxExecutor.execute(() -> this.stopOperatorForStopWithSavepoint(checkpointMetaData.getCheckpointId(), ((SavepointType)checkpointOptions.getCheckpointType()).shouldDrain()), "stop legacy source for stop-with-savepoint --drain");
        return this.sourceThread.getCompletionFuture().thenCompose(ignore -> super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions));
    }

    private void stopOperatorForStopWithSavepoint(long checkpointId, boolean drain) {
        this.setSynchronousSavepoint(checkpointId);
        FinishingReason finishingReason = this.finishingReason = drain ? FinishingReason.STOP_WITH_SAVEPOINT_DRAIN : FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN;
        if (this.mainOperator != null) {
            ((StreamSource)this.mainOperator).stop();
        }
    }

    @Override
    protected void declineCheckpoint(long checkpointId) {
        if (!this.externallyInducedCheckpoints) {
            super.declineCheckpoint(checkpointId);
        }
    }

    private static enum FinishingReason {
        END_OF_DATA(StopMode.DRAIN),
        STOP_WITH_SAVEPOINT_DRAIN(StopMode.DRAIN),
        STOP_WITH_SAVEPOINT_NO_DRAIN(StopMode.NO_DRAIN);

        private final StopMode stopMode;

        private FinishingReason(StopMode stopMode) {
            this.stopMode = stopMode;
        }

        StopMode toStopMode() {
            return this.stopMode;
        }
    }

    private class LegacySourceFunctionThread
    extends Thread {
        private final CompletableFuture<Void> completionFuture = new CompletableFuture();

        LegacySourceFunctionThread() {
        }

        @Override
        public void run() {
            try {
                if (!SourceStreamTask.this.operatorChain.isTaskDeployedAsFinished()) {
                    StreamTask.LOG.debug("Legacy source {} skip execution since the task is finished on restore", (Object)SourceStreamTask.this.getTaskNameWithSubtaskAndId());
                    ((StreamSource)SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.operatorChain);
                }
                this.completeProcessing();
                this.completionFuture.complete(null);
            }
            catch (Throwable t) {
                if (SourceStreamTask.this.isCanceled() && ExceptionUtils.findThrowable((Throwable)t, InterruptedException.class).isPresent()) {
                    this.completionFuture.completeExceptionally(new CancelTaskException(t));
                }
                this.completionFuture.completeExceptionally(t);
            }
        }

        private void completeProcessing() throws InterruptedException, ExecutionException {
            if (!SourceStreamTask.this.isCanceled() && !SourceStreamTask.this.isFailing()) {
                SourceStreamTask.this.mainMailboxExecutor.submit(() -> {
                    StopMode stopMode = SourceStreamTask.this.finishingReason.toStopMode();
                    if (stopMode == StopMode.DRAIN) {
                        SourceStreamTask.this.operatorChain.endInput(1);
                    }
                    SourceStreamTask.this.endData(stopMode);
                }, "SourceStreamTask finished processing data.").get();
            }
        }

        public void setTaskDescription(String taskDescription) {
            this.setName("Legacy Source Thread - " + taskDescription);
        }

        CompletableFuture<Void> getCompletionFuture() {
            return SourceStreamTask.this.isFailing() && !this.isAlive() ? CompletableFuture.completedFuture(null) : this.completionFuture;
        }
    }
}

