/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.runtime.operators.CrossDriver;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.junit.jupiter.api.TestTemplate;

class CrossTaskTest
extends DriverTestBase<CrossFunction<Record, Record, Record>> {
    private static final long CROSS_MEM = 0x100000L;
    private final double cross_frac;
    private final DriverTestBase.CountingOutputCollector output = new DriverTestBase.CountingOutputCollector();

    CrossTaskTest(ExecutionConfig config) {
        super(config, 0x100000L, 0);
        this.cross_frac = 1048576.0 / (double)this.getMemoryManager().getMemorySize();
    }

    @TestTemplate
    void testBlock1CrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testBlock2CrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testFailingBlockCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingCrossStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testFailingBlockCrossTask2() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingCrossStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testStream1CrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testStream2CrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testFailingStreamCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingCrossStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testFailingStreamCrossTask2() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 100;
        int valCnt2 = 4;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingCrossStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testStreamEmptyInnerCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 0;
        int valCnt2 = 0;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testStreamEmptyOuterCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 0;
        int valCnt2 = 0;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testBlockEmptyInnerCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 0;
        int valCnt2 = 0;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testBlockEmptyOuterCrossTask() {
        int keyCnt1 = 10;
        int valCnt1 = 1;
        int keyCnt2 = 0;
        int valCnt2 = 0;
        int expCnt = keyCnt1 * valCnt1 * keyCnt2 * valCnt2;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        CrossDriver testTask = new CrossDriver();
        try {
            this.testDriver((Driver)testTask, MockCrossStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test failed due to an exception.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.output.getNumberOfRecords()).withFailMessage("Wrong result size.", new Object[0])).isEqualTo(expCnt);
    }

    @TestTemplate
    void testCancelBlockCrossTaskInit() {
        int keyCnt = 10;
        int valCnt = 1;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver testTask = new CrossDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CrossTaskTest.this.testDriver((Driver)testTask, MockCrossStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Exception was thrown despite proper canceling.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testCancelBlockCrossTaskCrossing() {
        int keyCnt = 10;
        int valCnt = 1;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver testTask = new CrossDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CrossTaskTest.this.testDriver((Driver)testTask, MockCrossStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Exception was thrown despite proper canceling.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testCancelStreamCrossTaskInit() {
        int keyCnt = 10;
        int valCnt = 1;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver testTask = new CrossDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CrossTaskTest.this.testDriver((Driver)testTask, MockCrossStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Exception was thrown despite proper canceling.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testCancelStreamCrossTaskCrossing() {
        int keyCnt = 10;
        int valCnt = 1;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver testTask = new CrossDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CrossTaskTest.this.testDriver((Driver)testTask, MockCrossStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Exception was thrown despite proper canceling.", new Object[0])).isTrue();
    }

    public static final class MockCrossStub
    implements CrossFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public Record cross(Record record1, Record record2) throws Exception {
            return record1;
        }
    }

    public static final class MockFailingCrossStub
    implements CrossFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;

        public Record cross(Record record1, Record record2) {
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            return record1;
        }
    }
}

