/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private static final String STATE_NAME = "_async_wait_operator_state_";
    private final int capacity;
    private final AsyncDataStream.OutputMode outputMode;
    private final long timeout;
    private final AsyncRetryStrategy<OUT> asyncRetryStrategy;
    private final boolean retryEnabled;
    private transient StreamElementSerializer<IN> inStreamElementSerializer;
    private transient ListState<StreamElement> recoveredStreamElements;
    private transient StreamElementQueue<OUT> queue;
    private transient Set<RetryableResultHandlerDelegator> inFlightDelayRetryHandlers;
    private final transient MailboxExecutor mailboxExecutor;
    private transient TimestampedCollector<OUT> timestampedCollector;
    private transient boolean isObjectReuseEnabled;
    private transient Predicate<Collection<OUT>> retryResultPredicate;
    private transient Predicate<Throwable> retryExceptionPredicate;
    private transient AtomicBoolean retryDisabledOnFinish;

    public AsyncWaitOperator(StreamOperatorParameters<OUT> parameters, @Nonnull AsyncFunction<IN, OUT> asyncFunction, long timeout, int capacity, @Nonnull AsyncDataStream.OutputMode outputMode, @Nonnull AsyncRetryStrategy<OUT> asyncRetryStrategy, @Nonnull ProcessingTimeService processingTimeService, @Nonnull MailboxExecutor mailboxExecutor) {
        super(null, asyncFunction);
        Preconditions.checkArgument((capacity > 0 ? 1 : 0) != 0, (Object)"The number of concurrent async operation should be greater than 0.");
        this.capacity = capacity;
        this.outputMode = (AsyncDataStream.OutputMode)((Object)Preconditions.checkNotNull((Object)((Object)outputMode), (String)"outputMode"));
        this.timeout = timeout;
        this.asyncRetryStrategy = asyncRetryStrategy;
        this.retryEnabled = asyncRetryStrategy != AsyncRetryStrategies.NO_RETRY_STRATEGY && (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() || asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent());
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.mailboxExecutor = mailboxExecutor;
        this.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
    }

    protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.inStreamElementSerializer = new StreamElementSerializer(this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader()));
        switch (this.outputMode) {
            case ORDERED: {
                this.queue = new OrderedStreamElementQueue(this.capacity);
                break;
            }
            case UNORDERED: {
                this.queue = new UnorderedStreamElementQueue(this.capacity);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown async mode: " + this.outputMode + ".");
            }
        }
        if (this.retryEnabled) {
            this.retryResultPredicate = this.asyncRetryStrategy.getRetryPredicate().resultPredicate().orElse(ignore -> false);
            this.retryExceptionPredicate = this.asyncRetryStrategy.getRetryPredicate().exceptionPredicate().orElse(ignore -> false);
        }
        this.timestampedCollector = new TimestampedCollector(this.output);
    }

    public void open() throws Exception {
        super.open();
        this.isObjectReuseEnabled = this.getExecutionConfig().isObjectReuseEnabled();
        if (this.retryEnabled) {
            this.inFlightDelayRetryHandlers = new HashSet<RetryableResultHandlerDelegator>();
            this.retryDisabledOnFinish = new AtomicBoolean(false);
        }
        if (this.recoveredStreamElements != null) {
            for (StreamElement element : (Iterable)this.recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    this.processElement(element.asRecord());
                    continue;
                }
                if (element.isWatermark()) {
                    this.processWatermark(element.asWatermark());
                    continue;
                }
                if (element.isLatencyMarker()) {
                    this.processLatencyMarker(element.asLatencyMarker());
                    continue;
                }
                throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
            }
            this.recoveredStreamElements = null;
        }
    }

    public void processElement(StreamRecord<IN> record) throws Exception {
        StreamRecord element = this.isObjectReuseEnabled ? (StreamRecord)this.inStreamElementSerializer.copy(record) : record;
        ResultFuture<OUT> entry = this.addToWorkQueue((StreamElement)element);
        if (this.retryEnabled) {
            RetryableResultHandlerDelegator resultHandler = new RetryableResultHandlerDelegator(element, entry, this.getProcessingTimeService());
            assert (this.timeout > 0L);
            resultHandler.registerTimeout(this.timeout);
            ((AsyncFunction)this.userFunction).asyncInvoke(element.getValue(), resultHandler);
        } else {
            ResultHandler resultHandler = new ResultHandler(element, entry);
            if (this.timeout > 0L) {
                resultHandler.registerTimeout(this.getProcessingTimeService(), this.timeout);
            }
            ((AsyncFunction)this.userFunction).asyncInvoke(element.getValue(), resultHandler);
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.addToWorkQueue((StreamElement)mark);
        this.outputCompletedElement();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        ListState partitionableState = this.getOperatorStateBackend().getListState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
        try {
            partitionableState.update(this.queue.values());
        }
        catch (Exception e) {
            partitionableState.clear();
            throw new Exception("Could not add stream element queue entries to operator state backend of operator " + this.getOperatorName() + ".", e);
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.recoveredStreamElements = context.getOperatorStateStore().getListState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
    }

    public void endInput() throws Exception {
        this.finishInFlightDelayedRetry();
        this.waitInFlightInputsFinished();
    }

    private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement) throws InterruptedException {
        Optional<ResultFuture<OUT>> queueEntry;
        while (!(queueEntry = this.queue.tryPut(streamElement)).isPresent()) {
            this.mailboxExecutor.yield();
        }
        return queueEntry.get();
    }

    private void finishInFlightDelayedRetry() throws Exception {
        if (this.retryEnabled) {
            this.retryDisabledOnFinish.set(true);
            if (this.inFlightDelayRetryHandlers.size() > 0) {
                for (RetryableResultHandlerDelegator delegator : this.inFlightDelayRetryHandlers) {
                    assert (delegator.delayedRetryTimer != null);
                    delegator.cancelRetryTimer();
                    this.tryOnce(delegator);
                }
                this.inFlightDelayRetryHandlers.clear();
            }
        }
    }

    private void waitInFlightInputsFinished() throws InterruptedException {
        while (!this.queue.isEmpty()) {
            this.mailboxExecutor.yield();
        }
    }

    private void outputCompletedElement() {
        if (this.queue.hasCompletedElements()) {
            this.queue.emitCompletedElement(this.timestampedCollector);
            if (this.queue.hasCompletedElements()) {
                try {
                    this.mailboxExecutor.execute(this::outputCompletedElement, "AsyncWaitOperator#outputCompletedElement");
                }
                catch (RejectedExecutionException mailboxClosedException) {
                    LOG.debug("Attempt to complete element is ignored since the mailbox rejected the execution.", (Throwable)mailboxClosedException);
                }
            }
        }
    }

    private void tryOnce(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
        ++resultHandlerDelegator.currentAttempts;
        ((AsyncFunction)this.userFunction).asyncInvoke(resultHandlerDelegator.resultHandler.inputRecord.getValue(), resultHandlerDelegator);
    }

    private ScheduledFuture<?> registerTimer(ProcessingTimeService processingTimeService, long timeout, ThrowingConsumer<Void, Exception> callback) {
        long timeoutTimestamp = timeout + processingTimeService.getCurrentProcessingTime();
        return processingTimeService.registerTimer(timeoutTimestamp, timestamp -> callback.accept(null));
    }

    private class RetryableResultHandlerDelegator
    implements ResultFuture<OUT> {
        private final ResultHandler resultHandler;
        private final ProcessingTimeService processingTimeService;
        private ScheduledFuture<?> delayedRetryTimer;
        private int currentAttempts = 1;
        private final AtomicBoolean retryAwaiting = new AtomicBoolean(false);

        public RetryableResultHandlerDelegator(StreamRecord<IN> inputRecord, ResultFuture<OUT> resultFuture, ProcessingTimeService processingTimeService) {
            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
            this.processingTimeService = processingTimeService;
        }

        private void registerTimeout(long timeout) {
            this.resultHandler.timeoutTimer = AsyncWaitOperator.this.registerTimer(this.processingTimeService, timeout, (ThrowingConsumer<Void, Exception>)((ThrowingConsumer)t -> this.timerTriggered()));
        }

        private void cancelRetryTimer() {
            if (this.delayedRetryTimer != null) {
                this.delayedRetryTimer.cancel(false);
            }
        }

        private void timerTriggered() throws Exception {
            if (!this.resultHandler.completed.get()) {
                this.cancelRetryTimer();
                this.retryAwaiting.set(false);
                ((AsyncFunction)AsyncWaitOperator.this.userFunction).timeout(this.resultHandler.inputRecord.getValue(), this);
            }
        }

        @Override
        public void complete(Collection<OUT> results) {
            Preconditions.checkNotNull(results, (String)"Results must not be null, use empty collection to emit nothing");
            if (!AsyncWaitOperator.this.retryDisabledOnFinish.get() && this.resultHandler.inputRecord.isRecord()) {
                this.processRetryInMailBox(results, null);
            } else {
                this.cancelRetryTimer();
                this.resultHandler.complete(results);
            }
        }

        @Override
        public void completeExceptionally(Throwable error) {
            if (!AsyncWaitOperator.this.retryDisabledOnFinish.get() && this.resultHandler.inputRecord.isRecord()) {
                this.processRetryInMailBox(null, error);
            } else {
                this.cancelRetryTimer();
                this.resultHandler.completeExceptionally(error);
            }
        }

        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
            AsyncWaitOperator.this.mailboxExecutor.execute(() -> this.processRetry(results, error), "delayed retry or complete");
        }

        private void processRetry(Collection<OUT> results, Throwable error) {
            boolean satisfy;
            if (!this.retryAwaiting.compareAndSet(false, true)) {
                return;
            }
            boolean bl = satisfy = null != results && AsyncWaitOperator.this.retryResultPredicate.test(results) || null != error && AsyncWaitOperator.this.retryExceptionPredicate.test(error);
            if (satisfy && AsyncWaitOperator.this.asyncRetryStrategy.canRetry(this.currentAttempts) && !AsyncWaitOperator.this.retryDisabledOnFinish.get()) {
                long nextBackoffTimeMillis = AsyncWaitOperator.this.asyncRetryStrategy.getBackoffTimeMillis(this.currentAttempts);
                long delayedRetry = nextBackoffTimeMillis + AsyncWaitOperator.this.getProcessingTimeService().getCurrentProcessingTime();
                this.delayedRetryTimer = this.processingTimeService.registerTimer(delayedRetry, timestamp -> this.doRetry());
                if (this.currentAttempts == 1) {
                    AsyncWaitOperator.this.inFlightDelayRetryHandlers.add(this);
                }
            } else {
                if (this.currentAttempts > 1) {
                    AsyncWaitOperator.this.inFlightDelayRetryHandlers.remove(this);
                }
                if (null != results) {
                    this.resultHandler.complete(results);
                } else {
                    this.resultHandler.completeExceptionally(error);
                }
            }
        }

        private void doRetry() throws Exception {
            if (this.retryAwaiting.compareAndSet(true, false)) {
                AsyncWaitOperator.this.tryOnce(this);
            }
        }
    }

    private class ResultHandler
    implements ResultFuture<OUT> {
        private ScheduledFuture<?> timeoutTimer;
        private final StreamRecord<IN> inputRecord;
        private final ResultFuture<OUT> resultFuture;
        private final AtomicBoolean completed = new AtomicBoolean(false);

        ResultHandler(StreamRecord<IN> inputRecord, ResultFuture<OUT> resultFuture) {
            this.inputRecord = inputRecord;
            this.resultFuture = resultFuture;
        }

        @Override
        public void complete(Collection<OUT> results) {
            if (!this.completed.compareAndSet(false, true)) {
                return;
            }
            this.processInMailbox(results);
        }

        private void processInMailbox(Collection<OUT> results) {
            AsyncWaitOperator.this.mailboxExecutor.execute(() -> this.processResults(results), "Result in AsyncWaitOperator of input %s", new Object[]{results});
        }

        private void processResults(Collection<OUT> results) {
            if (this.timeoutTimer != null) {
                this.timeoutTimer.cancel(true);
            }
            this.resultFuture.complete(results);
            AsyncWaitOperator.this.outputCompletedElement();
        }

        @Override
        public void completeExceptionally(Throwable error) {
            if (!this.completed.compareAndSet(false, true)) {
                return;
            }
            AsyncWaitOperator.this.getContainingTask().getEnvironment().failExternally((Throwable)new Exception("Could not complete the stream element: " + this.inputRecord + ".", error));
            this.processInMailbox(Collections.emptyList());
        }

        private void registerTimeout(ProcessingTimeService processingTimeService, long timeout) {
            this.timeoutTimer = AsyncWaitOperator.this.registerTimer(processingTimeService, timeout, (ThrowingConsumer<Void, Exception>)((ThrowingConsumer)t -> this.timerTriggered()));
        }

        private void timerTriggered() throws Exception {
            if (!this.completed.get()) {
                ((AsyncFunction)AsyncWaitOperator.this.userFunction).timeout(this.inputRecord.getValue(), this);
            }
        }
    }
}

