/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.security.FlinkSecurityManager;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.InitializationStatus;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ContainingTaskDetails;
import org.apache.flink.streaming.runtime.tasks.FinishedOperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxMetricsController;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
implements TaskInvokable,
CheckpointableTask,
CoordinatedTask,
AsyncExceptionHandler,
ContainingTaskDetails {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final StreamTaskActionExecutor actionExecutor;
    @Nullable
    protected StreamInputProcessor inputProcessor;
    protected OP mainOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected final StateBackend stateBackend;
    protected final CheckpointStorage checkpointStorage;
    private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
    protected final TimerService timerService;
    protected final TimerService systemTimerService;
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private final AutoCloseableRegistry resourceCloser;
    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
    private volatile boolean isRunning;
    private volatile boolean isRestoring;
    private volatile boolean canceled;
    private volatile boolean failing;
    private boolean finishedOperators;
    private boolean closedOperators;
    private final ExecutorService asyncOperationsThreadPool;
    protected final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
    protected final MailboxProcessor mailboxProcessor;
    final MailboxExecutor mainMailboxExecutor;
    private final ExecutorService channelIOExecutor;
    private Long syncSavepoint = null;
    private Long finalCheckpointMinId = null;
    private final CompletableFuture<Void> finalCheckpointCompleted = new CompletableFuture();
    private long latestReportCheckpointId = -1L;
    private long latestAsyncCheckpointStartDelayNanos;
    private volatile boolean endOfDataReceived = false;
    private final long bufferDebloatPeriod;
    private final Environment environment;
    private final Object shouldInterruptOnCancelLock = new Object();
    @GuardedBy(value="shouldInterruptOnCancelLock")
    private boolean shouldInterruptOnCancel = true;
    @Nullable
    private final AvailabilityProvider changelogWriterAvailabilityProvider;
    private long initializeStateEndTs;

    protected StreamTask(Environment env) throws Exception {
        this(env, null);
    }

    protected StreamTask(Environment env, @Nullable TimerService timerService) throws Exception {
        this(env, timerService, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, StreamTaskActionExecutor.IMMEDIATE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, actionExecutor, new TaskMailboxImpl(Thread.currentThread()));
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception {
        this.resourceCloser = new AutoCloseableRegistry();
        try {
            this.environment = environment;
            this.configuration = new StreamConfig(environment.getTaskConfiguration());
            MailboxMetricsController mailboxMetricsControl = new MailboxMetricsController(environment.getMetricGroup().getIOMetricGroup().getMailboxLatency(), environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter());
            environment.getMetricGroup().getIOMetricGroup().registerMailboxSizeSupplier(() -> mailbox.size());
            this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
            this.resourceCloser.registerCloseable((AutoCloseable)this.mailboxProcessor);
            this.channelIOExecutor = MdcUtils.scopeToJob((JobID)environment.getJobID(), (ExecutorService)Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory("channel-state-unspilling")));
            this.resourceCloser.registerCloseable(this.channelIOExecutor::shutdown);
            this.recordWriter = StreamTask.createRecordWriterDelegate(this.configuration, environment);
            this.resourceCloser.registerCloseable(this::releaseOutputResources);
            this.resourceCloser.registerCloseable(this::closeAllOperators);
            this.resourceCloser.registerCloseable(this::cleanUpInternal);
            this.actionExecutor = (StreamTaskActionExecutor)Preconditions.checkNotNull((Object)actionExecutor);
            this.mainMailboxExecutor = this.mailboxProcessor.getMainMailboxExecutor();
            this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
            this.asyncOperationsThreadPool = MdcUtils.scopeToJob((JobID)this.getEnvironment().getJobID(), (ExecutorService)new ThreadPoolExecutor(0, this.configuration.getMaxConcurrentCheckpoints() + 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler)));
            this.resourceCloser.registerCloseable(this::shutdownAsyncThreads);
            this.resourceCloser.registerCloseable((AutoCloseable)this.cancelables);
            environment.setMainMailboxExecutor(this.mainMailboxExecutor);
            environment.setAsyncOperationsThreadPool(this.asyncOperationsThreadPool);
            this.stateBackend = this.createStateBackend();
            this.checkpointStorage = this.createCheckpointStorage(this.stateBackend);
            this.changelogWriterAvailabilityProvider = environment.getTaskStateManager().getStateChangelogStorage() == null ? null : environment.getTaskStateManager().getStateChangelogStorage().getAvailabilityProvider();
            CheckpointStorageAccess checkpointStorageAccess = this.checkpointStorage.createCheckpointStorage(this.getEnvironment().getJobID());
            checkpointStorageAccess = this.tryApplyFileMergingCheckpoint(checkpointStorageAccess, environment.getTaskStateManager().getFileMergingSnapshotManager());
            environment.setCheckpointStorageAccess(checkpointStorageAccess);
            this.timerService = timerService == null ? this.createTimerService("Time Trigger for " + this.getName()) : timerService;
            this.systemTimerService = this.createTimerService("System Time Trigger for " + this.getName());
            CheckpointStorageAccess finalCheckpointStorageAccess = checkpointStorageAccess;
            ChannelStateWriter channelStateWriter = this.configuration.isUnalignedCheckpointsEnabled() ? SubtaskCheckpointCoordinatorImpl.openChannelStateWriter(this.getName(), (SupplierWithException<CheckpointStorageWorkerView, ? extends IOException>)((SupplierWithException)() -> {
                if (finalCheckpointStorageAccess instanceof FsMergingCheckpointStorageAccess) {
                    return finalCheckpointStorageAccess;
                }
                return this.checkpointStorage.createCheckpointStorage(this.getEnvironment().getJobID());
            }), environment, this.configuration.getMaxSubtasksPerChannelStateFile()) : ChannelStateWriter.NO_OP;
            this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl(checkpointStorageAccess, this.getName(), actionExecutor, this.getAsyncOperationsThreadPool(), environment, this, (BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException>)((BiFunctionWithException)this::prepareInputSnapshot), this.configuration.getMaxConcurrentCheckpoints(), channelStateWriter, (Boolean)this.configuration.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), BarrierAlignmentUtil.createRegisterTimerCallback(this.mainMailboxExecutor, this.systemTimerService), environment.getTaskStateManager().getFileMergingSnapshotManager());
            this.resourceCloser.registerCloseable(this.subtaskCheckpointCoordinator::close);
            this.resourceCloser.registerCloseable(this::tryShutdownTimerService);
            this.injectChannelStateWriterIntoChannels();
            environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
            Configuration taskManagerConf = environment.getTaskManagerInfo().getConfiguration();
            this.bufferDebloatPeriod = ((Duration)taskManagerConf.get(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD)).toMillis();
            mailboxMetricsControl.setupLatencyMeasurement(this.systemTimerService, this.mainMailboxExecutor);
        }
        catch (Exception ex) {
            try {
                this.resourceCloser.close();
            }
            catch (Throwable throwable) {
                ex.addSuppressed(throwable);
            }
            throw ex;
        }
    }

    private CheckpointStorageAccess tryApplyFileMergingCheckpoint(CheckpointStorageAccess checkpointStorageAccess, @Nullable FileMergingSnapshotManager fileMergingSnapshotManager) {
        if (fileMergingSnapshotManager == null) {
            return checkpointStorageAccess;
        }
        try {
            CheckpointStorageAccess mergingCheckpointStorageAccess = (CheckpointStorageAccess)checkpointStorageAccess.toFileMergingStorage(fileMergingSnapshotManager, this.environment);
            mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
            if (mergingCheckpointStorageAccess instanceof FsMergingCheckpointStorageAccess) {
                this.resourceCloser.registerCloseable(() -> ((FsMergingCheckpointStorageAccess)mergingCheckpointStorageAccess).close());
            }
            return mergingCheckpointStorageAccess;
        }
        catch (IOException e) {
            LOG.warn("Initiating FsMergingCheckpointStorageAccess failed with exception: {}, falling back to original checkpoint storage access {}.", new Object[]{e.getMessage(), checkpointStorageAccess.getClass(), e});
            return checkpointStorageAccess;
        }
    }

    private TimerService createTimerService(String timerThreadName) {
        DispatcherThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, timerThreadName);
        return new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
    }

    private void injectChannelStateWriterIntoChannels() {
        Environment env = this.getEnvironment();
        ChannelStateWriter channelStateWriter = this.subtaskCheckpointCoordinator.getChannelStateWriter();
        for (IndexedInputGate indexedInputGate : env.getAllInputGates()) {
            indexedInputGate.setChannelStateWriter(channelStateWriter);
        }
        for (AvailabilityProvider availabilityProvider : env.getAllWriters()) {
            if (!(availabilityProvider instanceof ChannelStateHolder)) continue;
            ((ChannelStateHolder)((Object)availabilityProvider)).setChannelStateWriter(channelStateWriter);
        }
    }

    private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
        if (this.inputProcessor == null) {
            return FutureUtils.completedVoidFuture();
        }
        return this.inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
    }

    SubtaskCheckpointCoordinator getCheckpointCoordinator() {
        return this.subtaskCheckpointCoordinator;
    }

    protected abstract void init() throws Exception;

    protected void cancelTask() throws Exception {
    }

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        CompletableFuture<?> resumeFuture;
        GaugePeriodTimer timer;
        DataInputStatus status = this.inputProcessor.processInput();
        switch (status) {
            case MORE_AVAILABLE: {
                if (!this.taskIsAvailable()) break;
                return;
            }
            case NOTHING_AVAILABLE: {
                break;
            }
            case END_OF_RECOVERY: {
                throw new IllegalStateException("We should not receive this event here.");
            }
            case STOPPED: {
                this.endData(StopMode.NO_DRAIN);
                return;
            }
            case END_OF_DATA: {
                this.endData(StopMode.DRAIN);
                this.notifyEndOfData();
                return;
            }
            case END_OF_INPUT: {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
                return;
            }
        }
        TaskIOMetricGroup ioMetrics = this.getEnvironment().getMetricGroup().getIOMetricGroup();
        if (!this.recordWriter.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
            resumeFuture = this.recordWriter.getAvailableFuture();
        } else if (!this.inputProcessor.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
            resumeFuture = this.inputProcessor.getAvailableFuture();
        } else if (this.changelogWriterAvailabilityProvider != null && !this.changelogWriterAvailabilityProvider.isAvailable()) {
            timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
            resumeFuture = this.changelogWriterAvailabilityProvider.getAvailableFuture();
        } else {
            return;
        }
        FutureUtils.assertNoException((CompletableFuture)resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
    }

    protected void endData(StopMode mode) throws Exception {
        if (mode == StopMode.DRAIN) {
            this.advanceToEndOfEventTime();
        }
        this.operatorChain.finishOperators(this.actionExecutor, mode);
        this.finishedOperators = true;
        for (ResultPartitionWriter partitionWriter : this.getEnvironment().getAllWriters()) {
            partitionWriter.notifyEndOfData(mode);
        }
        this.endOfDataReceived = true;
    }

    protected void notifyEndOfData() {
        this.environment.getTaskManagerActions().notifyEndOfData(this.environment.getExecutionId());
    }

    protected void setSynchronousSavepoint(long checkpointId) {
        Preconditions.checkState((this.syncSavepoint == null || this.syncSavepoint == checkpointId ? 1 : 0) != 0, (Object)"at most one stop-with-savepoint checkpoint at a time is allowed");
        this.syncSavepoint = checkpointId;
    }

    @VisibleForTesting
    OptionalLong getSynchronousSavepointId() {
        if (this.syncSavepoint != null) {
            return OptionalLong.of(this.syncSavepoint);
        }
        return OptionalLong.empty();
    }

    private boolean isCurrentSyncSavepoint(long checkpointId) {
        return this.syncSavepoint != null && this.syncSavepoint == checkpointId;
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer(SubTaskInitializationMetricsBuilder initializationMetrics) {
        InternalTimeServiceManager.Provider timerServiceProvider = this.configuration.getTimerServiceProvider(this.getUserCodeClassLoader());
        return new StreamTaskStateInitializerImpl(this.getEnvironment(), this.stateBackend, initializationMetrics, TtlTimeProvider.DEFAULT, timerServiceProvider != null ? timerServiceProvider : InternalTimeServiceManagerImpl::create, () -> this.canceled);
    }

    protected Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
        try {
            return streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        }
        catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
            return new SimpleCounter();
        }
    }

    @Override
    public final void restore() throws Exception {
        this.restoreInternal();
    }

    void restoreInternal() throws Exception {
        if (this.isRunning) {
            LOG.debug("Re-restore attempt rejected.");
            return;
        }
        this.isRestoring = true;
        this.closedOperators = false;
        this.getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
        LOG.debug("Initializing {}.", (Object)this.getName());
        SubTaskInitializationMetricsBuilder initializationMetrics = new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis());
        try {
            this.operatorChain = this.getEnvironment().getTaskStateManager().isTaskDeployedAsFinished() ? new FinishedOperatorChain(this, this.recordWriter) : new RegularOperatorChain(this, this.recordWriter);
            this.mainOperator = this.operatorChain.getMainOperator();
            this.getEnvironment().getTaskStateManager().getRestoreCheckpointId().ifPresent(restoreId -> {
                this.latestReportCheckpointId = restoreId;
            });
            this.init();
            this.configuration.clearInitialConfigs();
            this.ensureNotCanceled();
            LOG.debug("Invoking {}", (Object)this.getName());
            CompletableFuture allGatesRecoveredFuture = this.actionExecutor.call(() -> this.restoreStateAndGates(initializationMetrics));
            this.mailboxProcessor.runMailboxLoop();
            initializationMetrics.addDurationMetric("GateRestoreDurationMs", SystemClock.getInstance().absoluteTimeMillis() - this.initializeStateEndTs);
            this.ensureNotCanceled();
            Preconditions.checkState((boolean)allGatesRecoveredFuture.isDone(), (Object)"Mailbox loop interrupted before recovery was finished.");
            this.channelIOExecutor.shutdown();
            this.isRunning = true;
            this.isRestoring = false;
            initializationMetrics.setStatus(InitializationStatus.COMPLETED);
        }
        finally {
            this.environment.getTaskStateManager().reportInitializationMetrics(initializationMetrics.build());
        }
    }

    private CompletableFuture<Void> restoreStateAndGates(SubTaskInitializationMetricsBuilder initializationMetrics) throws Exception {
        long mailboxStartTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric("MailboxStartDurationMs", mailboxStartTs - initializationMetrics.getInitializationStartTs());
        SequentialChannelStateReader reader = this.getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
        reader.readOutputData(this.getEnvironment().getAllWriters(), !this.configuration.isGraphContainingLoops());
        long readOutputDataTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric("ReadOutputDataDurationMs", readOutputDataTs - mailboxStartTs);
        this.operatorChain.initializeStateAndOpenOperators(this.createStreamTaskStateInitializer(initializationMetrics));
        this.initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis();
        initializationMetrics.addDurationMetric("InitializeStateDurationMs", this.initializeStateEndTs - readOutputDataTs);
        IndexedInputGate[] inputGates = this.getEnvironment().getAllInputGates();
        this.channelIOExecutor.execute(() -> {
            try {
                reader.readInputData(inputGates);
            }
            catch (Exception e) {
                this.asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
            }
        });
        ArrayList<CompletableFuture<Void>> recoveredFutures = new ArrayList<CompletableFuture<Void>>(inputGates.length);
        for (IndexedInputGate inputGate : inputGates) {
            recoveredFutures.add(inputGate.getStateConsumedFuture());
            inputGate.getStateConsumedFuture().thenRun(() -> this.mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions"));
        }
        return CompletableFuture.allOf(recoveredFutures.toArray(new CompletableFuture[0])).thenRun(this.mailboxProcessor::suspend);
    }

    private void ensureNotCanceled() {
        if (this.canceled) {
            throw new CancelTaskException();
        }
    }

    @Override
    public final void invoke() throws Exception {
        if (!this.isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            this.restoreInternal();
        }
        this.ensureNotCanceled();
        this.scheduleBufferDebloater();
        this.getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
        this.runMailboxLoop();
        this.ensureNotCanceled();
        this.afterInvoke();
    }

    private void scheduleBufferDebloater() {
        if (this.getEnvironment().getAllInputGates().length == 0 || !((Boolean)this.environment.getTaskManagerInfo().getConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)).booleanValue()) {
            return;
        }
        this.systemTimerService.registerTimer(this.systemTimerService.getCurrentProcessingTime() + this.bufferDebloatPeriod, timestamp -> this.mainMailboxExecutor.execute(() -> {
            this.debloat();
            this.scheduleBufferDebloater();
        }, "Buffer size recalculation"));
    }

    @VisibleForTesting
    void debloat() {
        for (IndexedInputGate inputGate : this.environment.getAllInputGates()) {
            inputGate.triggerDebloating();
        }
    }

    @VisibleForTesting
    public boolean runSingleMailboxLoop() throws Exception {
        return this.mailboxProcessor.runSingleMailboxLoop();
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        return this.mailboxProcessor.runMailboxStep();
    }

    @VisibleForTesting
    public boolean isMailboxLoopRunning() {
        return this.mailboxProcessor.isMailboxLoopRunning();
    }

    public void runMailboxLoop() throws Exception {
        this.mailboxProcessor.runMailboxLoop();
    }

    protected void afterInvoke() throws Exception {
        LOG.debug("Finished task {}", (Object)this.getName());
        ((CompletableFuture)this.getCompletionFuture().exceptionally(unused -> null)).join();
        HashSet<CompletableFuture<Void>> terminationConditions = new HashSet<CompletableFuture<Void>>();
        if (this.endOfDataReceived && this.areCheckpointsWithFinishedTasksEnabled()) {
            LOG.debug("Waiting for all the records processed by the downstream tasks.");
            for (ResultPartitionWriter partitionWriter : this.getEnvironment().getAllWriters()) {
                terminationConditions.add(partitionWriter.getAllDataProcessedFuture());
            }
            terminationConditions.add(this.finalCheckpointCompleted);
        }
        if (this.syncSavepoint != null) {
            terminationConditions.add(this.finalCheckpointCompleted);
        }
        FutureUtils.waitForAll(terminationConditions).thenRun(this.mailboxProcessor::allActionsCompleted);
        this.mailboxProcessor.runMailboxLoop();
        this.actionExecutor.runThrowing(() -> {
            this.timerService.quiesce().get();
            this.systemTimerService.quiesce().get();
            this.mailboxProcessor.prepareClose();
        });
        this.mailboxProcessor.drain();
        this.actionExecutor.runThrowing(() -> {
            this.isRunning = false;
        });
        LOG.debug("Finished operators for task {}", (Object)this.getName());
        this.operatorChain.flushOutputs();
        if (this.areCheckpointsWithFinishedTasksEnabled()) {
            this.subtaskCheckpointCoordinator.waitForPendingCheckpoints();
            LOG.debug("All pending checkpoints are finished");
        }
        this.disableInterruptOnCancel();
        this.closeAllOperators();
    }

    private boolean areCheckpointsWithFinishedTasksEnabled() {
        return (Boolean)this.configuration.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH) != false && this.configuration.isCheckpointingEnabled();
    }

    @Override
    public final void cleanUp(Throwable throwable) throws Exception {
        LOG.debug("Cleanup StreamTask (operators closed: {}, cancelled: {})", (Object)this.closedOperators, (Object)this.canceled);
        this.failing = !this.canceled && throwable != null;
        Exception cancelException = null;
        if (throwable != null) {
            try {
                this.cancelTask();
            }
            catch (Throwable t) {
                cancelException = t instanceof Exception ? (Exception)t : new Exception(t);
            }
        }
        this.disableInterruptOnCancel();
        ((CompletableFuture)this.getCompletionFuture().exceptionally(unused -> null)).join();
        this.isRunning = false;
        Thread.interrupted();
        try {
            this.resourceCloser.close();
        }
        catch (Throwable t) {
            Exception e = t instanceof Exception ? (Exception)t : new Exception(t);
            throw (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)cancelException);
        }
    }

    protected void cleanUpInternal() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.close();
        }
    }

    protected CompletableFuture<Void> getCompletionFuture() {
        return FutureUtils.completedVoidFuture();
    }

    @Override
    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            this.cancelTask();
        }
        finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
            this.getCompletionFuture().whenComplete((unusedResult, unusedError) -> {
                this.mailboxProcessor.allActionsCompleted();
                try {
                    this.subtaskCheckpointCoordinator.cancel();
                    this.cancelables.close();
                }
                catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }
    }

    public MailboxExecutorFactory getMailboxExecutorFactory() {
        return this.mailboxProcessor::getMailboxExecutor;
    }

    public boolean hasMail() {
        return this.mailboxProcessor.hasMail();
    }

    private boolean taskIsAvailable() {
        return this.recordWriter.isAvailable() && (this.changelogWriterAvailabilityProvider == null || this.changelogWriterAvailabilityProvider.isAvailable());
    }

    public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
        return () -> !this.mailboxProcessor.hasMail() && this.taskIsAvailable();
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    public final boolean isFailing() {
        return this.failing;
    }

    private void shutdownAsyncThreads() throws Exception {
        if (!this.asyncOperationsThreadPool.isShutdown()) {
            this.asyncOperationsThreadPool.shutdownNow();
        }
    }

    private void releaseOutputResources() throws Exception {
        if (this.operatorChain != null) {
            this.actionExecutor.run(() -> this.operatorChain.close());
        } else {
            this.recordWriter.close();
        }
    }

    private void closeAllOperators() throws Exception {
        if (this.operatorChain != null && !this.closedOperators) {
            this.closedOperators = true;
            this.operatorChain.closeAllOperators();
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (!this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        if (!this.systemTimerService.isTerminated()) {
            LOG.info("System timer service is shutting down.");
            this.systemTimerService.shutdownService();
        }
        this.cancelables.close();
    }

    boolean isSerializingTimestamps() {
        return true;
    }

    public final String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    String getTaskNameWithSubtaskAndId() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + " (" + this.getEnvironment().getExecutionId() + ")";
    }

    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.subtaskCheckpointCoordinator.getCheckpointStorage();
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        this.checkForcedFullSnapshotSupport(checkpointOptions);
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        this.mainMailboxExecutor.execute(() -> {
            try {
                boolean noUnfinishedInputGates = Arrays.stream(this.getEnvironment().getAllInputGates()).allMatch(InputGate::isFinished);
                if (noUnfinishedInputGates) {
                    result.complete(this.triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));
                } else {
                    result.complete(this.triggerUnfinishedChannelsCheckpoint(checkpointMetaData, checkpointOptions));
                }
            }
            catch (Exception ex) {
                result.completeExceptionally(ex);
                throw ex;
            }
        }, "checkpoint %s with %s", new Object[]{checkpointMetaData, checkpointOptions});
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean triggerCheckpointAsyncInMailbox(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            this.latestAsyncCheckpointStartDelayNanos = 1000000L * Math.max(0L, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
            CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder().setAlignmentDurationNanos(0L).setBytesProcessedDuringAlignment(0L).setCheckpointStartDelayNanos(this.latestAsyncCheckpointStartDelayNanos);
            this.subtaskCheckpointCoordinator.initInputsCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
            boolean success = this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
            if (!success) {
                this.declineCheckpoint(checkpointMetaData.getCheckpointId());
            }
            boolean bl = success;
            return bl;
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + ".", e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{checkpointMetaData.getCheckpointId(), this.getName(), e});
            boolean bl = false;
            return bl;
        }
        finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

    private boolean triggerUnfinishedChannelsCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        Optional<CheckpointBarrierHandler> checkpointBarrierHandler = this.getCheckpointBarrierHandler();
        Preconditions.checkState((boolean)checkpointBarrierHandler.isPresent(), (Object)"CheckpointBarrier should exist for tasks with network inputs.");
        CheckpointBarrier barrier = new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
        for (IndexedInputGate inputGate : this.getEnvironment().getAllInputGates()) {
            if (inputGate.isFinished()) continue;
            for (InputChannelInfo channelInfo : inputGate.getUnfinishedChannels()) {
                checkpointBarrierHandler.get().processBarrier(barrier, channelInfo, true);
            }
        }
        return true;
    }

    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
        return Optional.empty();
    }

    @Override
    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws IOException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", (Object)this.getName(), (Object)checkpointMetaData.getCheckpointId());
            throw e;
        }
        catch (Exception e) {
            throw new IOException("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + ".", e);
        }
        finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

    @Override
    public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) throws IOException {
        if (this.isCurrentSyncSavepoint(checkpointId)) {
            throw new FlinkRuntimeException("Stop-with-savepoint failed.");
        }
        this.subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, this.operatorChain);
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception {
        SnapshotType checkpointType = checkpointOptions.getCheckpointType();
        LOG.debug("Starting checkpoint {} {} on task {}", new Object[]{checkpointMetaData.getCheckpointId(), checkpointType, this.getName()});
        if (this.isRunning) {
            this.actionExecutor.runThrowing(() -> {
                if (this.isSynchronous(checkpointType)) {
                    this.setSynchronousSavepoint(checkpointMetaData.getCheckpointId());
                }
                if (this.areCheckpointsWithFinishedTasksEnabled() && this.endOfDataReceived && this.finalCheckpointMinId == null) {
                    this.finalCheckpointMinId = checkpointMetaData.getCheckpointId();
                }
                this.subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics, this.operatorChain, this.finishedOperators, this::isRunning);
            });
            return true;
        }
        this.actionExecutor.runThrowing(() -> {
            CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            this.recordWriter.broadcastEvent(message);
        });
        return false;
    }

    private boolean isSynchronous(SnapshotType checkpointType) {
        return checkpointType.isSavepoint() && ((SavepointType)checkpointType).isSynchronous();
    }

    private void checkForcedFullSnapshotSupport(CheckpointOptions checkpointOptions) {
        SavepointType savepointType;
        if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT) && !this.stateBackend.supportsNoClaimRestoreMode()) {
            throw new IllegalStateException(String.format("Configured state backend (%s) does not support enforcing a full snapshot. If you are restoring in %s mode, please consider choosing %s mode.", this.stateBackend, RecoveryClaimMode.NO_CLAIM, RecoveryClaimMode.CLAIM));
        }
        if (checkpointOptions.getCheckpointType().isSavepoint() && !this.stateBackend.supportsSavepointFormat((savepointType = (SavepointType)checkpointOptions.getCheckpointType()).getFormatType())) {
            throw new IllegalStateException(String.format("Configured state backend (%s) does not support %s savepoints", this.stateBackend, savepointType.getFormatType()));
        }
    }

    protected void declineCheckpoint(long checkpointId) {
        this.getEnvironment().declineCheckpoint(checkpointId, new CheckpointException("Task Name" + this.getName(), CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }

    public final ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    @Override
    public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
        return this.notifyCheckpointOperation(() -> this.notifyCheckpointComplete(checkpointId), String.format("checkpoint %d complete", checkpointId));
    }

    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {
        return this.notifyCheckpointOperation(() -> {
            if (latestCompletedCheckpointId > 0L) {
                this.notifyCheckpointComplete(latestCompletedCheckpointId);
            }
            if (this.isCurrentSyncSavepoint(checkpointId)) {
                throw new FlinkRuntimeException("Stop-with-savepoint failed.");
            }
            this.subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, this.operatorChain, this::isRunning);
        }, String.format("checkpoint %d aborted", checkpointId));
    }

    @Override
    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
        return this.notifyCheckpointOperation(() -> this.subtaskCheckpointCoordinator.notifyCheckpointSubsumed(checkpointId, this.operatorChain, this::isRunning), String.format("checkpoint %d subsumed", checkpointId));
    }

    private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        this.mailboxProcessor.getMailboxExecutor(Integer.MAX_VALUE).execute(() -> {
            try {
                runnable.run();
            }
            catch (Exception ex) {
                result.completeExceptionally(ex);
                throw ex;
            }
            result.complete(null);
        }, description);
        return result;
    }

    private void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Notify checkpoint {} complete on task {}", (Object)checkpointId, (Object)this.getName());
        if (checkpointId <= this.latestReportCheckpointId) {
            return;
        }
        this.latestReportCheckpointId = checkpointId;
        this.subtaskCheckpointCoordinator.notifyCheckpointComplete(checkpointId, this.operatorChain, this::isRunning);
        if (this.isRunning) {
            if (this.isCurrentSyncSavepoint(checkpointId)) {
                this.finalCheckpointCompleted.complete(null);
            } else if (this.syncSavepoint == null && this.finalCheckpointMinId != null && checkpointId >= this.finalCheckpointMinId) {
                this.finalCheckpointCompleted.complete(null);
            }
        }
    }

    private void tryShutdownTimerService() {
        long timeoutMs = ((Duration)this.getEnvironment().getTaskManagerInfo().getConfiguration().get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS)).toMillis();
        this.tryShutdownTimerService(timeoutMs, this.timerService);
        this.tryShutdownTimerService(timeoutMs, this.systemTimerService);
    }

    private void tryShutdownTimerService(long timeoutMs, TimerService timerService) {
        if (!timerService.isTerminated() && !timerService.shutdownServiceUninterruptible(timeoutMs)) {
            LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", (Object)timeoutMs);
        }
    }

    @Override
    public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
        try {
            this.mainMailboxExecutor.execute(() -> this.operatorChain.dispatchOperatorEvent(operator, event), "dispatch operator event");
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    private StateBackend createStateBackend() throws Exception {
        StateBackend fromApplication = this.configuration.getStateBackend(this.getUserCodeClassLoader());
        return StateBackendLoader.fromApplicationOrConfigOrDefault(fromApplication, this.getJobConfiguration(), this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getUserCodeClassLoader(), LOG);
    }

    private CheckpointStorage createCheckpointStorage(StateBackend backend) throws Exception {
        CheckpointStorage fromApplication = this.configuration.getCheckpointStorage(this.getUserCodeClassLoader());
        return CheckpointStorageLoader.load(fromApplication, backend, this.getJobConfiguration(), this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getUserCodeClassLoader(), LOG);
    }

    @VisibleForTesting
    TimerService getTimerService() {
        return this.timerService;
    }

    @VisibleForTesting
    OP getMainOperator() {
        return this.mainOperator;
    }

    @VisibleForTesting
    StreamTaskActionExecutor getActionExecutor() {
        return this.actionExecutor;
    }

    public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
        return mailboxExecutor -> new ProcessingTimeServiceImpl(this.timerService, callback -> this.deferCallbackToMailbox(mailboxExecutor, (ProcessingTimeService.ProcessingTimeCallback)callback));
    }

    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (this.isRestoring || this.isRunning) {
            this.asyncExceptionHandler.handleAsyncException(message, exception);
        }
    }

    public String toString() {
        return this.getName();
    }

    public final CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig configuration, Environment environment) {
        List recordWrites = StreamTask.createRecordWriters(configuration, environment);
        if (recordWrites.size() == 1) {
            return new SingleRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(recordWrites.get(0));
        }
        if (recordWrites.size() == 0) {
            return new NonRecordWriter<SerializationDelegate<StreamRecord<OUT>>>();
        }
        return new MultipleRecordWriters<SerializationDelegate<StreamRecord<OUT>>>(recordWrites);
    }

    private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig configuration, Environment environment) {
        ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
        List<NonChainedOutput> outputsInOrder = configuration.getVertexNonChainedOutputs(environment.getUserCodeClassLoader().asClassLoader());
        int index = 0;
        for (NonChainedOutput streamOutput : outputsInOrder) {
            StreamTask.replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput, index);
            recordWriters.add(StreamTask.createRecordWriter(streamOutput, index++, environment, environment.getTaskInfo().getTaskNameWithSubtasks(), streamOutput.getBufferTimeout()));
        }
        return recordWriters;
    }

    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(Environment environment, NonChainedOutput streamOutput, int outputIndex) {
        if (streamOutput.getPartitioner() instanceof ForwardPartitioner && environment.getWriter(outputIndex).getNumberOfSubpartitions() != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
            LOG.debug("Replacing forward partitioner with rebalance for {}", (Object)environment.getTaskInfo().getTaskNameWithSubtasks());
            streamOutput.setPartitioner(new RebalancePartitioner());
        }
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(NonChainedOutput streamOutput, int outputIndex, Environment environment, String taskNameWithSubtask, long bufferTimeout) {
        int numKeyGroups;
        StreamPartitioner outputPartitioner = null;
        try {
            outputPartitioner = (StreamPartitioner)InstantiationUtil.clone(streamOutput.getPartitioner(), (ClassLoader)environment.getUserCodeClassLoader().asClassLoader());
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{outputPartitioner, outputIndex, taskNameWithSubtask});
        ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
        if (outputPartitioner instanceof ConfigurableStreamPartitioner && 0 < (numKeyGroups = bufferWriter.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner)((Object)outputPartitioner)).configure(numKeyGroups);
        }
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder().setChannelSelector(outputPartitioner).setTimeout(bufferTimeout).setTaskName(taskNameWithSubtask).build(bufferWriter);
        output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return output;
    }

    private void handleTimerException(Exception ex) {
        this.handleAsyncException("Caught exception while processing timer.", new TimerException(ex));
    }

    @VisibleForTesting
    ProcessingTimeService.ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeService.ProcessingTimeCallback callback) {
        return timestamp -> mailboxExecutor.execute(() -> this.invokeProcessingTimeCallback(callback, timestamp), "Timer callback for %s @ %d", new Object[]{callback, timestamp});
    }

    private void invokeProcessingTimeCallback(ProcessingTimeService.ProcessingTimeCallback callback, long timestamp) {
        try {
            callback.onProcessingTime(timestamp);
        }
        catch (Throwable t) {
            this.handleAsyncException("Caught exception while processing timer.", new TimerException(t));
        }
    }

    protected long getAsyncCheckpointStartDelayNanos() {
        return this.latestAsyncCheckpointStartDelayNanos;
    }

    @Override
    public boolean isUsingNonBlockingInput() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disableInterruptOnCancel() {
        Object object = this.shouldInterruptOnCancelLock;
        synchronized (object) {
            this.shouldInterruptOnCancel = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void maybeInterruptOnCancel(Thread toInterrupt, @Nullable String taskName, @Nullable Long timeout) {
        Object object = this.shouldInterruptOnCancelLock;
        synchronized (object) {
            if (this.shouldInterruptOnCancel) {
                if (taskName != null && timeout != null) {
                    Task.logTaskThreadStackTrace(toInterrupt, taskName, timeout, "interrupting");
                }
                toInterrupt.interrupt();
            }
        }
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }

    static class StreamTaskAsyncExceptionHandler
    implements AsyncExceptionHandler {
        private final Environment environment;

        StreamTaskAsyncExceptionHandler(Environment environment) {
            this.environment = environment;
        }

        @Override
        public void handleAsyncException(String message, Throwable exception) {
            this.environment.failExternally(new AsynchronousException(message, exception));
        }
    }

    private static class ResumeWrapper
    implements Runnable {
        private final MailboxDefaultAction.Suspension suspendedDefaultAction;
        @Nullable
        private final PeriodTimer timer;

        public ResumeWrapper(MailboxDefaultAction.Suspension suspendedDefaultAction, @Nullable PeriodTimer timer) {
            this.suspendedDefaultAction = suspendedDefaultAction;
            if (timer != null) {
                timer.markStart();
            }
            this.timer = timer;
        }

        @Override
        public void run() {
            if (this.timer != null) {
                this.timer.markEnd();
            }
            this.suspendedDefaultAction.resume();
        }
    }

    @FunctionalInterface
    public static interface CanEmitBatchOfRecordsChecker {
        public boolean check();
    }
}

