/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.python.aggregate;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperatorTest;
import org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupAggregateOperator;
import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
import org.apache.flink.table.runtime.utils.PassThroughStreamAggregatePythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;

class PythonStreamGroupAggregateOperatorTest
extends AbstractPythonStreamAggregateOperatorTest {
    PythonStreamGroupAggregateOperatorTest() {
    }

    @Test
    void testFlushDataOnClose() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(new Configuration());
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(false, "c2", 1L), initialTime + 2L));
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(false, "c2", 1L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testFinishBundleTriggeredOnCheckpoint() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)10);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 1L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c3", 2L), initialTime + 3L));
        testHarness.prepareSnapshotPreBarrier(0L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 1L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c3", 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByCount() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)3);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 1L), initialTime + 2L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c3", 2L), initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 1L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c3", 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testFinishBundleTriggeredByTime() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)10);
        conf.set(PythonOptions.MAX_BUNDLE_TIME_MILLS, (Object)1000L);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 1L), initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c3", 2L), initialTime + 3L));
        this.assertOutputEquals("FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 1L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c3", 2L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testWatermarkProcessedOnFinishBundle() throws Exception {
        Configuration conf = new Configuration();
        conf.set(PythonOptions.MAX_BUNDLE_SIZE, (Object)10);
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 1L), initialTime + 2L));
        testHarness.processWatermark(initialTime + 2L);
        this.assertOutputEquals("Watermark has been processed", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 1L)));
        expectedOutput.add(new Watermark(initialTime + 2L));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testStateCleanupTimer() throws Exception {
        Configuration conf = new Configuration();
        conf.setString("table.exec.state.ttl", "100");
        OneInputStreamOperatorTestHarness testHarness = this.getTestHarness(conf);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c1", 0L), initialTime + 1L));
        testHarness.setProcessingTime(500L);
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 1L), initialTime + 2L));
        testHarness.setProcessingTime(599L);
        testHarness.processElement(new StreamRecord((Object)this.newRow(true, "c2", 2L), initialTime + 3L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c1", 0L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "state_cleanup_triggered: c1", 100L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 1L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "c2", 2L)));
        expectedOutput.add(new StreamRecord((Object)this.newRow(true, "state_cleanup_triggered: c2", 699L)));
        this.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Override
    public OneInputStreamOperator getTestOperator(Configuration config) {
        long stateTtl = Long.valueOf(config.getString("table.exec.state.ttl", "0"));
        return new PassThroughPythonStreamGroupAggregateOperator(config, this.getInputType(), this.getOutputType(), new PythonAggregateFunctionInfo[]{new PythonAggregateFunctionInfo(PythonScalarFunctionOperatorTestBase.DummyPythonFunction.INSTANCE, (Object[])new Integer[]{0}, -1, false)}, this.getGrouping(), -1, false, false, stateTtl, stateTtl);
    }

    private static class PassThroughPythonStreamGroupAggregateOperator
    extends PythonStreamGroupAggregateOperator {
        PassThroughPythonStreamGroupAggregateOperator(Configuration config, RowType inputType, RowType outputType, PythonAggregateFunctionInfo[] aggregateFunctions, int[] grouping, int indexOfCountStar, boolean countStarInserted, boolean generateUpdateBefore, long minRetentionTime, long maxRetentionTime) {
            super(config, inputType, outputType, aggregateFunctions, new DataViewSpec[0][0], grouping, indexOfCountStar, countStarInserted, generateUpdateBefore, minRetentionTime, maxRetentionTime);
        }

        public PythonFunctionRunner createPythonFunctionRunner() {
            return new PassThroughStreamAggregatePythonFunctionRunner(this.getContainingTask().getEnvironment(), this.getRuntimeContext().getTaskInfo().getTaskName(), PythonTestUtils.createTestProcessEnvironmentManager(), this.userDefinedFunctionInputType, this.outputType, "flink:transform:stream_group_aggregate:v1", this.getUserDefinedFunctionsProto(), PythonTestUtils.createMockFlinkMetricContainer(), this.getKeyedStateBackend(), this.getKeySerializer(), this.getProcessFunction());
        }

        private Function<byte[], byte[]> getProcessFunction() {
            return input_bytes -> {
                try {
                    RowData input = (RowData)this.udfInputTypeSerializer.deserialize((DataInputView)new DataInputDeserializer(input_bytes));
                    DataOutputSerializer output = new DataOutputSerializer(1);
                    if (input.getByte(0) == 0) {
                        this.udfOutputTypeSerializer.serialize((Object)input.getRow(1, this.inputType.getFieldCount()), (DataOutputView)output);
                    } else {
                        this.udfOutputTypeSerializer.serialize((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)("state_cleanup_triggered: " + input.getRow(3, this.getKeyType().getFieldCount()).getString(0))), input.getLong(2)}), (DataOutputView)output);
                    }
                    return output.getCopyOfBuffer();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            };
        }
    }
}

