/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunctionTestBase;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class RowTimeDeduplicateFunctionTest
extends RowTimeDeduplicateFunctionTestBase {
    private final boolean miniBatchEnable;
    private final boolean enableAsyncState;

    RowTimeDeduplicateFunctionTest(boolean miniBatchEnable, boolean enableAsyncState) {
        this.enableAsyncState = enableAsyncState;
        this.miniBatchEnable = miniBatchEnable;
    }

    @Parameters(name="miniBatchEnable = {0}, enableAsyncState = {1}")
    private static Collection<Boolean[]> runMode() {
        return Arrays.asList({false, false}, {false, true}, {true, false});
    }

    @TestTemplate
    void testRowTimeDeduplicateKeepFirstRow() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        if (!this.enableAsyncState) {
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        }
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepFirstRow(true, true, expectedOutput);
        this.testRowTimeDeduplicateKeepFirstRow(true, false, expectedOutput);
        this.testRowTimeDeduplicateKeepFirstRow(false, true, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        if (!this.enableAsyncState) {
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        }
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepFirstRow(false, false, expectedOutput);
    }

    @TestTemplate
    void testRowTimeDeduplicateKeepLastRow() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key2", 11, 101L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        if (this.enableAsyncState) {
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 12, 300L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key2", 11, 301L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        } else {
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        }
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(true, true, expectedOutput);
        this.testRowTimeDeduplicateKeepLastRow(true, false, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        if (this.enableAsyncState) {
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        } else {
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
            expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        }
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(false, true, expectedOutput);
        expectedOutput.clear();
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        this.testRowTimeDeduplicateKeepLastRow(false, false, expectedOutput);
    }

    private void testRowTimeDeduplicateKeepFirstRow(boolean generateUpdateBefore, boolean generateInsert, List<Object> expectedOutput) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
        AsyncStateRowTimeDeduplicateFunction func;
        boolean keepLastRow = false;
        KeyedMapBundleOperator keyedMapBundleOperator = null;
        AsyncKeyedProcessOperator keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            if (this.enableAsyncState) {
                throw new UnsupportedOperationException("Mini-batch deduplicate op is not supported async state api");
            }
            func = new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, false);
            CountBundleTrigger trigger = new CountBundleTrigger(4L);
            keyedMapBundleOperator = new KeyedMapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger);
            testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedMapBundleOperator);
        } else {
            if (this.enableAsyncState) {
                func = new AsyncStateRowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, false);
                keyedProcessOperator = new AsyncKeyedProcessOperator((KeyedProcessFunction)func);
            } else {
                func = new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, false);
                keyedProcessOperator = new KeyedProcessOperator((KeyedProcessFunction)func);
            }
            testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedProcessOperator);
        }
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        testHarness.processWatermark(new Watermark(102L));
        actualOutput.addAll(testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.miniBatchEnable ? this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedMapBundleOperator) : this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedProcessOperator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        testHarness.processWatermark(new Watermark(302L));
        testHarness.setStateTtlProcessingTime(this.minTtlTime.toMillis() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        testHarness.processWatermark(402L);
        actualOutput.addAll(testHarness.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        testHarness.close();
    }

    private void testRowTimeDeduplicateKeepLastRow(boolean generateUpdateBefore, boolean generateInsert, List<Object> expectedOutput) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
        AsyncStateRowTimeDeduplicateFunction func;
        boolean keepLastRow = true;
        KeyedMapBundleOperator keyedMapBundleOperator = null;
        AsyncKeyedProcessOperator keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            if (this.enableAsyncState) {
                throw new UnsupportedOperationException("Mini-batch deduplicate op is not supported async state api");
            }
            func = new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, true);
            CountBundleTrigger trigger = new CountBundleTrigger(4L);
            keyedMapBundleOperator = new KeyedMapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger);
            testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedMapBundleOperator);
        } else {
            if (this.enableAsyncState) {
                func = new AsyncStateRowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, true);
                keyedProcessOperator = new AsyncKeyedProcessOperator((KeyedProcessFunction)func);
            } else {
                func = new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex, generateUpdateBefore, generateInsert, true);
                keyedProcessOperator = new KeyedProcessOperator((KeyedProcessFunction)func);
            }
            testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedProcessOperator);
        }
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        testHarness.processWatermark(new Watermark(102L));
        actualOutput.addAll(testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.miniBatchEnable ? this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedMapBundleOperator) : this.createTestHarness((OneInputStreamOperator<RowData, RowData>)keyedProcessOperator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        testHarness.processWatermark(new Watermark(302L));
        testHarness.setStateTtlProcessingTime(this.minTtlTime.toMillis() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        testHarness.processWatermark(402L);
        actualOutput.addAll(testHarness.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        testHarness.close();
    }
}

