/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.FilterCondition;
import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
import org.apache.flink.table.runtime.generated.GeneratedFilterCondition;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;

class LookupJoinHarnessTest {
    private final TypeSerializer<RowData> inSerializer = new RowDataSerializer(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});

    LookupJoinHarnessTest() {
    }

    @Test
    void testTemporalInnerJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalInnerJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, null, null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, null, null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithPreFilter() throws Exception {
        LookupJoinRunner joinRunner = new LookupJoinRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), new GeneratedCollectorWrapper<TestingFetcherCollector>(new TestingFetcherCollector()), (GeneratedFunction)new GeneratedFilterCondition("", "", new Object[0]){

            public FilterCondition newInstance(ClassLoader classLoader) {
                return new TestingPreFilterCondition();
            }
        }, true, 2);
        ProcessOperator operator = new ProcessOperator((ProcessFunction)joinRunner);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, this.inSerializer);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, null, null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable) throws Exception {
        boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
        Object joinRunner = filterOnTable == FilterOnTable.WITHOUT_FILTER ? new LookupJoinRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), new GeneratedCollectorWrapper<TestingFetcherCollector>(new TestingFetcherCollector()), new GeneratedFunctionWrapper<TestingPreFilterCondition>(new TestingPreFilterCondition()), isLeftJoin, 2) : new LookupJoinWithCalcRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), new GeneratedFunctionWrapper<CalculateOnTemporalTable>(new CalculateOnTemporalTable()), new GeneratedCollectorWrapper<TestingFetcherCollector>(new TestingFetcherCollector()), new GeneratedFunctionWrapper<TestingPreFilterCondition>(new TestingPreFilterCondition()), isLeftJoin, 2);
        ProcessOperator operator = new ProcessOperator((ProcessFunction)joinRunner);
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, this.inSerializer);
    }

    private static enum JoinType {
        INNER_JOIN,
        LEFT_JOIN;

    }

    private static enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER;

    }

    public static final class TestingFetcherFunction
    implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = 4018474964018227081L;
        private static final Map<Integer, List<GenericRowData>> data = new HashMap<Integer, List<GenericRowData>>();

        public void flatMap(RowData value, Collector<RowData> out) throws Exception {
            int id = value.getInt(0);
            List<GenericRowData> rows = data.get(id);
            if (rows != null) {
                for (GenericRowData row : rows) {
                    out.collect((Object)row);
                }
            }
        }

        static {
            data.put(1, Collections.singletonList(GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Julian")})));
            data.put(3, Arrays.asList(GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jark")}), GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jackson")})));
            data.put(4, Collections.singletonList(GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Fabian")})));
        }
    }

    public static final class TestingFetcherCollector
    extends ListenableCollector<RowData> {
        private static final long serialVersionUID = -312754413938303160L;

        public void collect(RowData record) {
            RowData left = (RowData)this.getInput();
            RowData right = record;
            this.getCollectListener().ifPresent(listener -> listener.onCollect((Object)record));
            this.outputResult(new JoinedRowData(left, right));
        }
    }

    public static final class TestingPreFilterCondition
    extends AbstractRichFunction
    implements FilterCondition {
        private static final long serialVersionUID = 1L;

        public void open(OpenContext context) throws Exception {
        }

        public void close() throws Exception {
        }

        public boolean apply(RowData in) {
            return !in.isNullAt(1);
        }
    }

    public static final class CalculateOnTemporalTable
    implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = -1860345072157431136L;

        public void flatMap(RowData value, Collector<RowData> out) throws Exception {
            BinaryStringData name = (BinaryStringData)value.getString(1);
            if (name.getSizeInBytes() >= 6) {
                out.collect((Object)value);
            }
        }
    }
}

