/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.codegen.runtimefilter;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.runtimefilter.RuntimeFilterCodeGenerator;
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
import org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperatorTest;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class RuntimeFilterCodeGeneratorTest {
    private StreamTaskMailboxTestHarness<RowData> testHarness;

    RuntimeFilterCodeGeneratorTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        RowType leftType = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new VarBinaryType()});
        RowType rightType = RowType.of((LogicalType[])new LogicalType[]{new VarCharType(), new IntType()});
        CodeGeneratorContext ctx = new CodeGeneratorContext((ReadableConfig)TableConfig.getDefault(), Thread.currentThread().getContextClassLoader());
        CodeGenOperatorFactory operatorFactory = RuntimeFilterCodeGenerator.gen((CodeGeneratorContext)ctx, (RowType)leftType, (RowType)rightType, (int[])new int[]{0});
        this.testHarness = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, (TypeInformation)InternalTypeInfo.of((RowType)rightType)).setupOutputForSingletonOperatorChain((StreamOperatorFactory)operatorFactory).addInput((TypeInformation)InternalTypeInfo.of((RowType)leftType)).addInput((TypeInformation)InternalTypeInfo.of((RowType)rightType)).build();
    }

    @AfterEach
    void cleanup() throws Exception {
        if (this.testHarness != null) {
            this.testHarness.close();
        }
    }

    @Test
    void testNormalFilter() throws Exception {
        this.finishBuildPhase(RuntimeFilterCodeGeneratorTest.createNormalInput());
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var1", (int)111), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var3", (int)333), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var5", (int)555), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var6", (int)666), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var8", (int)888), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var9", (int)999), 1);
        this.testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 1);
        Assertions.assertThat(this.getOutputRowData()).containsExactly((Object[])new GenericRowData[]{GenericRowData.of((Object[])new Object[]{"var1", 111}), GenericRowData.of((Object[])new Object[]{"var3", 333}), GenericRowData.of((Object[])new Object[]{"var5", 555})});
    }

    @Test
    void testOverMaxRowCountLimitFilter() throws Exception {
        this.finishBuildPhase(RuntimeFilterCodeGeneratorTest.createOverMaxRowCountLimitInput());
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var1", (int)111), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var3", (int)333), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var5", (int)555), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var6", (int)666), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var8", (int)888), 1);
        this.testHarness.processElement((Object)LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord((String)"var9", (int)999), 1);
        this.testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 1);
        Assertions.assertThat(this.getOutputRowData()).containsExactly((Object[])new GenericRowData[]{GenericRowData.of((Object[])new Object[]{"var1", 111}), GenericRowData.of((Object[])new Object[]{"var3", 333}), GenericRowData.of((Object[])new Object[]{"var5", 555}), GenericRowData.of((Object[])new Object[]{"var6", 666}), GenericRowData.of((Object[])new Object[]{"var8", 888}), GenericRowData.of((Object[])new Object[]{"var9", 999})});
    }

    private void finishBuildPhase(StreamRecord<RowData> leftInput) throws Exception {
        this.testHarness.processElement(leftInput, 0);
        this.testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0);
    }

    private List<GenericRowData> getOutputRowData() {
        return this.testHarness.getOutput().stream().map(record -> (RowData)((StreamRecord)record).getValue()).map(rowData -> {
            Assertions.assertThat((int)rowData.getArity()).isEqualTo(2);
            return GenericRowData.of((Object[])new Object[]{rowData.getString(0).toString(), rowData.getInt(1)});
        }).collect(Collectors.toList());
    }

    private static StreamRecord<RowData> createNormalInput() throws Exception {
        StreamTaskMailboxTestHarness localRuntimeFilterBuilder = LocalRuntimeFilterBuilderOperatorTest.createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements((int)5, (int)10);
        StreamRecord normalFilter = (StreamRecord)localRuntimeFilterBuilder.getOutput().poll();
        localRuntimeFilterBuilder.close();
        return normalFilter;
    }

    private static StreamRecord<RowData> createOverMaxRowCountLimitInput() {
        return new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{-1, null}));
    }
}

