/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import java.util.ArrayList;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.SumAggsHandleFunction;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ProcTimeUnboundedPrecedingFunctionTest {
    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig((long)2000L);
    private static GeneratedAggsHandleFunction aggsHandleFunction = new GeneratedAggsHandleFunction("Function", "", new Object[0]){

        public AggsHandleFunction newInstance(ClassLoader classLoader) {
            return new SumAggsHandleFunction(1);
        }
    };
    private LogicalType[] inputFieldTypes = new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType()};
    private LogicalType[] outputFieldTypes = new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()};
    private LogicalType[] accTypes = new LogicalType[]{new BigIntType()};
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<RowData> keyType = this.keySelector.getProducedType();
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.outputFieldTypes);

    ProcTimeUnboundedPrecedingFunctionTest() {
    }

    @Test
    void testStateTtl() throws Exception {
        ProcTimeUnboundedPrecedingFunction function = new ProcTimeUnboundedPrecedingFunction(this.ttlConfig, aggsHandleFunction, this.accTypes);
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        AbstractKeyedStateBackend stateBackend = (AbstractKeyedStateBackend)operator.getKeyedStateBackend();
        ((AbstractIntegerAssert)Assertions.assertThat((int)stateBackend.numKeyValueStateEntries()).as("Initial state is not empty", new Object[0])).isEqualTo(0);
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.setStateTtlProcessingTime(2001L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("key", 1L, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key", 1L, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key", 1L, 3L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key", 1L, 1L));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)this.keySelector, this.keyType);
    }
}

