/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.asyncprocessing;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTestUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;

public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
extends KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> {
    private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
    private ThrowingConsumer<StreamRecord<IN1>, Exception> processor1;
    private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;
    private final ExecutorService executor;

    public static <K, IN1, IN2, OUT, OP extends AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> constructor) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executor.execute(() -> {
            try {
                future.complete((AsyncKeyedTwoInputStreamOperatorTestHarness)constructor.apply((Object)executor));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP)((AsyncKeyedTwoInputStreamOperatorTestHarness)future.get());
    }

    public static <K, IN1, IN2, OUT> AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(TwoInputStreamOperator<IN1, IN2, OUT> operator, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {
        return AsyncKeyedTwoInputStreamOperatorTestHarness.create(operator, keySelector1, keySelector2, keyType, 1, 1, 0);
    }

    public static <K, IN1, IN2, OUT> AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(TwoInputStreamOperator<IN1, IN2, OUT> operator, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture future = new CompletableFuture();
        executor.execute(() -> {
            try {
                future.complete(new AsyncKeyedTwoInputStreamOperatorTestHarness(executor, operator, keySelector1, keySelector2, keyType, maxParallelism, numSubtasks, subtaskIndex));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedTwoInputStreamOperatorTestHarness)future.get();
    }

    public AsyncKeyedTwoInputStreamOperatorTestHarness(ExecutorService executor, TwoInputStreamOperator<IN1, IN2, OUT> operator, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
        super(operator, keySelector1, keySelector2, keyType, maxParallelism, numSubtasks, subtaskIndex);
        Preconditions.checkState((boolean)(operator instanceof AsyncStateProcessingOperator), (Object)"Operator is not an AsyncStateProcessingOperator");
        this.twoInputOperator = operator;
        this.executor = executor;
        this.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    private ThrowingConsumer<StreamRecord<IN1>, Exception> getRecordProcessor1() {
        if (this.processor1 == null) {
            this.processor1 = RecordProcessorUtils.getRecordProcessor1(this.twoInputOperator);
        }
        return this.processor1;
    }

    private ThrowingConsumer<StreamRecord<IN2>, Exception> getRecordProcessor2() {
        if (this.processor2 == null) {
            this.processor2 = RecordProcessorUtils.getRecordProcessor2(this.twoInputOperator);
        }
        return this.processor2;
    }

    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        this.executeAndGet(() -> this.getRecordProcessor1().accept((Object)element));
    }

    @Override
    public void processElement1(IN1 value, long timestamp) throws Exception {
        this.processElement1(new StreamRecord(value, timestamp));
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        this.executeAndGet(() -> this.getRecordProcessor2().accept((Object)element));
    }

    @Override
    public void processElement2(IN2 value, long timestamp) throws Exception {
        this.processElement2(new StreamRecord(value, timestamp));
    }

    @Override
    public void processWatermark1(Watermark mark) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processWatermark1(mark));
    }

    @Override
    public void processWatermark2(Watermark mark) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processWatermark2(mark));
    }

    @Override
    public void processBothWatermarks(Watermark mark) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processWatermark1(mark));
        this.executeAndGet(() -> this.twoInputOperator.processWatermark2(mark));
    }

    @Override
    public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processWatermarkStatus1(watermarkStatus));
    }

    @Override
    public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processWatermarkStatus2(watermarkStatus));
    }

    @Override
    public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processRecordAttributes1(recordAttributes));
    }

    @Override
    public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
        this.executeAndGet(() -> this.twoInputOperator.processRecordAttributes2(recordAttributes));
    }

    @Override
    public void endInput1() throws Exception {
        if (this.operator instanceof BoundedMultiInput) {
            this.executeAndGet(() -> ((BoundedMultiInput)this.operator).endInput(1));
        }
    }

    @Override
    public void endInput2() throws Exception {
        if (this.operator instanceof BoundedMultiInput) {
            this.executeAndGet(() -> ((BoundedMultiInput)this.operator).endInput(2));
        }
    }

    public void drainStateRequests() throws Exception {
        this.executeAndGet(() -> AsyncProcessingTestUtil.drain(this.operator));
    }

    @Override
    public void close() throws Exception {
        this.executeAndGet(() -> super.close());
        this.executor.shutdown();
    }

    @Override
    public int numKeyedStateEntries() {
        AbstractAsyncStateStreamOperator asyncOp = (AbstractAsyncStateStreamOperator)this.operator;
        AsyncKeyedStateBackend asyncKeyedStateBackend = asyncOp.getAsyncKeyedStateBackend();
        if (!(asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor)) {
            throw new UnsupportedOperationException(String.format("Unsupported async keyed state backend: %s", asyncKeyedStateBackend.getClass().getCanonicalName()));
        }
        CheckpointableKeyedStateBackend keyedStateBackend = ((AsyncKeyedStateBackendAdaptor)asyncKeyedStateBackend).getKeyedStateBackend();
        if (keyedStateBackend instanceof HeapKeyedStateBackend) {
            return ((HeapKeyedStateBackend)keyedStateBackend).numKeyValueStateEntries();
        }
        throw new UnsupportedOperationException(String.format("Unsupported keyed state backend: %s", keyedStateBackend.getClass().getCanonicalName()));
    }

    private void executeAndGet(RunnableWithException runnable) throws Exception {
        try {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                this.checkEnvState();
                runnable.run();
            }).get();
            this.checkEnvState();
        }
        catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> this.mockTask.cleanUp(e)).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (this.getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail((String)"There is an error on other threads", (Throwable)this.getEnvironment().getActualExternalFailureCause().get());
        }
    }
}

