/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.runtime.operators.BuildFirstCachedJoinDriver;
import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.TestTemplate;

public class CachedMatchTaskTest
extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
    private static final long HASH_MEM = 0x600000L;
    private static final long SORT_MEM = 0x300000L;
    private final RecordComparator comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final RecordComparator comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final List<Record> outList = new ArrayList<Record>();

    public CachedMatchTaskTest(ExecutionConfig config) {
        super(config, 0x600000L, 2, 0x300000L);
    }

    @TestTemplate
    void testHash1MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 10;
        int valCnt2 = 2;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        try {
            this.testResettableDriver((ResettableDriver)testTask, MockMatchStub.class, 3);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash2MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildSecondCachedJoinDriver testTask = new BuildSecondCachedJoinDriver();
        try {
            this.testResettableDriver((ResettableDriver)testTask, MockMatchStub.class, 3);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash3MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        try {
            this.testResettableDriver((ResettableDriver)testTask, MockMatchStub.class, 3);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash4MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildSecondCachedJoinDriver testTask = new BuildSecondCachedJoinDriver();
        try {
            this.testResettableDriver((ResettableDriver)testTask, MockMatchStub.class, 3);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash5MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        try {
            this.testResettableDriver((ResettableDriver)testTask, MockMatchStub.class, 3);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testFailingHashFirstMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        Assertions.assertThatThrownBy(() -> this.testResettableDriver((ResettableDriver)testTask, MockFailingMatchStub.class, 3)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testFailingHashSecondMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        BuildSecondCachedJoinDriver testTask = new BuildSecondCachedJoinDriver();
        Assertions.assertThatThrownBy(() -> this.testResettableDriver((ResettableDriver)testTask, MockFailingMatchStub.class, 3)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testCancelHashMatchTaskWhileBuildFirst() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        final BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CachedMatchTaskTest.this.testDriver((Driver)testTask, MockFailingMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testHashCancelMatchTaskWhileBuildSecond() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new DelayingInfinitiveInputIterator(100));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        final BuildSecondCachedJoinDriver testTask = new BuildSecondCachedJoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CachedMatchTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testHashFirstCancelMatchTaskWhileMatching() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        final BuildFirstCachedJoinDriver testTask = new BuildFirstCachedJoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CachedMatchTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testHashSecondCancelMatchTaskWhileMatching() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(1.0);
        final BuildSecondCachedJoinDriver testTask = new BuildSecondCachedJoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CachedMatchTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    public static final class MockMatchStub
    extends RichFlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
            out.collect((Object)record1);
        }
    }

    public static final class MockFailingMatchStub
    extends RichFlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;

        public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            out.collect((Object)record1);
        }
    }
}

