/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.groupwindow.operator;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.generated.NamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.operator.AggregateWindowOperator;
import org.apache.flink.table.runtime.operators.window.groupwindow.operator.WindowOperator;
import org.apache.flink.table.runtime.operators.window.groupwindow.operator.WindowOperatorBuilder;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.RowDataTestUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class WindowOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final boolean isTableAggregate;
    private final ZoneId shiftTimeZone;
    private static final SumAndCountAggTimeWindow sumAndCountAggTimeWindow = new SumAndCountAggTimeWindow();
    private static final SumAndCountTableAggTimeWindow sumAndCountTableAggTimeWindow = new SumAndCountTableAggTimeWindow();
    private static final SumAndCountAggCountWindow sumAndCountAggCountWindow = new SumAndCountAggCountWindow();
    private static final SumAndCountTableAggCountWindow sumAndCountTableAggCountWindow = new SumAndCountTableAggCountWindow();
    private static AtomicInteger closeCalled = new AtomicInteger(0);
    private LogicalType[] inputFieldTypes = new LogicalType[]{VarCharType.STRING_TYPE, new IntType(), new BigIntType()};
    private InternalTypeInfo<RowData> outputType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()});
    private LogicalType[] aggResultTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
    private LogicalType[] accTypes = new LogicalType[]{new BigIntType(), new BigIntType()};
    private LogicalType[] windowTypes = new LogicalType[]{new BigIntType(), new BigIntType(), new BigIntType()};
    private GenericRowEqualiser equaliser = new GenericRowEqualiser(this.accTypes, this.windowTypes);
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<RowData> keyType = this.keySelector.getProducedType();
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.outputType.toRowFieldTypes(), new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));

    @Parameters(name="isTableAggregate = {0}, TimeZone = {1}")
    private static Collection<Object[]> runMode() {
        return Arrays.asList({false, UTC_ZONE_ID}, {true, UTC_ZONE_ID}, {false, SHANGHAI_ZONE_ID}, {true, SHANGHAI_ZONE_ID});
    }

    public WindowOperatorTest(boolean isTableAggregate, ZoneId shiftTimeZone) {
        this.isTableAggregate = isTableAggregate;
        this.shiftTimeZone = shiftTimeZone;
    }

    private NamespaceAggsHandleFunctionBase getTimeWindowAggFunction() {
        return this.isTableAggregate ? sumAndCountTableAggTimeWindow : sumAndCountAggTimeWindow;
    }

    private NamespaceAggsHandleFunctionBase getCountWindowAggFunction() {
        return this.isTableAggregate ? sumAndCountTableAggCountWindow : sumAndCountAggCountWindow;
    }

    private ConcurrentLinkedQueue<Object> doubleRecord(boolean isDouble, StreamRecord<RowData> record) {
        ConcurrentLinkedQueue<Object> results = new ConcurrentLinkedQueue<Object>();
        results.add(record);
        if (isDouble) {
            results.add(record);
        }
        return results;
    }

    @TestTemplate
    void testEventTimeSlidingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withEventTime(2).aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-2000L), this.localMills(1000L), this.localMills(999L))));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-1000L), this.localMills(2000L), this.localMills(1999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(-1000L), this.localMills(2000L), this.localMills(1999L))));
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 5L, 5L, this.localMills(1000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(2000L), this.localMills(5000L), this.localMills(4999L))));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testProcessingTimeSlidingWindows() throws Throwable {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withShiftTimezone(this.shiftTimeZone).withInputFields(this.inputFieldTypes).sliding(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime().aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(1000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(-2000L), this.localMills(1000L), this.localMills(999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(2000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(-1000L), this.localMills(2000L), this.localMills(1999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(7000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(1000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 5L, this.localMills(1000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 5L, this.localMills(2000L), this.localMills(5000L), this.localMills(4999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testEventTimeCumulativeWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).cumulative(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withEventTime(2).aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 2999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(1000L), this.localMills(999L))));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L))));
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 4L, 4L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(5000L), this.localMills(4999L))));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testEventTimeCumulativeWindowsWithLateArrival() throws Exception {
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).cumulative(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(500L)).produceUpdates().aggregateAndBuild((NamespaceAggsHandleFunctionBase)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 500L));
        testHarness.processWatermark(new Watermark(1500L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(0L), this.localMills(1000L), this.localMills(999L)));
        expectedOutput.add(new Watermark(1500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1300L));
        testHarness.processWatermark(new Watermark(2300L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(new Watermark(2300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1997L));
        testHarness.processWatermark(new Watermark(6000L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        expectedOutput.add(new Watermark(6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processWatermark(new Watermark(7000L));
        expectedOutput.add(new Watermark(7000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @TestTemplate
    void testProcessingTimeCumulativeWindows() throws Throwable {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).cumulative(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime().aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(1000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(0L), this.localMills(1000L), this.localMills(999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(2000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(3000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(7000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(3000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(3000L), this.localMills(5000L), this.localMills(4999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(5000L), this.localMills(4999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testEventTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofSeconds(3L)).withEventTime(2).aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testEventTimeTumblingWindowsWithEarlyFiring() throws Exception {
        closeCalled.set(0);
        AggregateWindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).tumble(Duration.ofSeconds(3L)).withShiftTimezone(this.shiftTimeZone).withEventTime(2).triggering((Trigger)EventTimeTriggers.afterEndOfWindow().withEarlyFirings((Trigger)ProcessingTimeTriggers.every((Duration)Duration.ofSeconds(1L)))).produceUpdates().aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((WindowOperator)operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        testHarness.setProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1001L);
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.setProcessingTime(2001L);
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness((WindowOperator)operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(3001L);
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 4999L));
        testHarness.processWatermark(new Watermark(3999L));
        testHarness.setProcessingTime(4001L);
        expectedOutput.add(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 2001L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 2030L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(5100L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5122L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 2877L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testEventTimeTumblingWindowsWithEarlyAndLateFirings() throws Exception {
        closeCalled.set(0);
        AggregateWindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofSeconds(3L)).withEventTime(2).triggering(EventTimeTriggers.afterEndOfWindow().withEarlyFirings((Trigger)ProcessingTimeTriggers.every((Duration)Duration.ofSeconds(1L))).withLateFirings((Trigger)ElementTriggers.every())).withAllowedLateness(Duration.ofSeconds(3L)).produceUpdates().aggregate((NamespaceAggsHandleFunction)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((WindowOperator)operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 3000L));
        testHarness.setProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1001L);
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.setProcessingTime(2001L);
        expectedOutput.add(new Watermark(1999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness((WindowOperator)operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(3001L);
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 4999L));
        testHarness.processWatermark(new Watermark(3999L));
        testHarness.setProcessingTime(4001L);
        expectedOutput.add(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 2001L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 2030L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key1", 4L, 4L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(5100L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5122L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 4L, 4L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L)));
        expectedOutput.add(new Watermark(5999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(6001L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 2877L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 2899L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testProcessingTimeTumblingWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofSeconds(3L)).withProcessingTime().aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 2L, 2L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 7000L));
        testHarness.setProcessingTime(7000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(3000L), this.localMills(6000L), this.localMills(5999L))));
        Assertions.assertThat((Object)operator.getWatermarkLatency().getValue()).isEqualTo((Object)0L);
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testEventTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).session(Duration.ofSeconds(3L)).withEventTime(2).aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        Assertions.assertThat((Object)operator.getWatermarkLatency().getValue()).isEqualTo((Object)0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, 5501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 6, 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 6L, 3L, this.localMills(10L), this.localMills(5500L), this.localMills(5499L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 6L, 3L, this.localMills(0L), this.localMills(5500L), this.localMills(5499L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 20L, 4L, this.localMills(5501L), this.localMills(9050L), this.localMills(9049L))));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, 4000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 20, 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 30L, 2L, this.localMills(15000L), this.localMills(18000L), this.localMills(17999L))));
        expectedOutput.add(new Watermark(17999L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(18000L);
        Assertions.assertThat((Object)operator.getWatermarkLatency().getValue()).isEqualTo((Object)1L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
    }

    @TestTemplate
    void testProcessingTimeSessionWindows() throws Throwable {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).session(Duration.ofSeconds(3L)).withProcessingTime().aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.outputType.toRowFieldTypes(), new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1L));
        testHarness.setProcessingTime(1000L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1002L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3L), this.localMills(4000L), this.localMills(3999L))));
        assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 5000L));
        testHarness.setProcessingTime(10000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(5000L), this.localMills(8000L), this.localMills(7999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(5000L), this.localMills(8000L), this.localMills(7999L))));
        assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testPointSessions() throws Exception {
        closeCalled.set(0);
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).assigner((GroupWindowAssigner)new PointSessionWindowAssigner(3000L)).withEventTime(2).aggregateAndBuild(this.getTimeWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 33, 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 33, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 33, 2500L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 36L, 3L, this.localMills(10L), this.localMills(4000L), this.localMills(3999L))));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 67L, 3L, this.localMills(0L), this.localMills(3000L), this.localMills(2999L))));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testLateness() throws Exception {
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofSeconds(2L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(500L)).produceUpdates().aggregateAndBuild((NamespaceAggsHandleFunctionBase)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 500L));
        testHarness.processWatermark(new Watermark(1500L));
        expectedOutput.add(new Watermark(1500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1300L));
        testHarness.processWatermark(new Watermark(2300L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(new Watermark(2300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1997L));
        testHarness.processWatermark(new Watermark(6000L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("key2", 2L, 2L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expectedOutput.add(new Watermark(6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1998L));
        testHarness.processWatermark(new Watermark(7000L));
        expectedOutput.add(new Watermark(7000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @TestTemplate
    void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        int windowSize = 2;
        long lateness = 1L;
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofSeconds(2L)).withEventTime(2).withAllowedLateness(Duration.ofMillis(1L)).produceUpdates().aggregateAndBuild((NamespaceAggsHandleFunctionBase)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 1000L));
        testHarness.processWatermark(new Watermark(1599L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2000L));
        testHarness.processWatermark(new Watermark(5000L));
        expected.add(new Watermark(1599L));
        expected.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(0L), this.localMills(2000L), this.localMills(1999L)));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2000L));
        expected.add(new Watermark(5000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testCleanupTimeOverflow() throws Exception {
        if (!UTC_ZONE_ID.equals(this.shiftTimeZone)) {
            return;
        }
        long windowSize = 1000L;
        long lateness = 2000L;
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).tumble(Duration.ofMillis(windowSize)).withEventTime(2).withAllowedLateness(Duration.ofMillis(lateness)).produceUpdates().aggregateAndBuild((NamespaceAggsHandleFunctionBase)new SumAndCountAggTimeWindow(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, this.windowTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)this.keySelector, this.keyType);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        TumblingWindowAssigner windowAssigner = TumblingWindowAssigner.of((Duration)Duration.ofMillis(windowSize));
        long timestamp = 9223372036854774057L;
        Collection windows = windowAssigner.assignWindows((RowData)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"key2"), 1}), timestamp);
        TimeWindow window = (TimeWindow)windows.iterator().next();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, timestamp));
        Assertions.assertThat((long)(window.maxTimestamp() + lateness)).isLessThan(window.maxTimestamp());
        Assertions.assertThat((long)(window.maxTimestamp() + lateness)).isLessThan(9223372036854774307L);
        testHarness.processWatermark(new Watermark(9223372036854774307L));
        Assertions.assertThat((long)9223372036854774307L).isLessThan(window.maxTimestamp());
        Assertions.assertThat((long)window.maxTimestamp()).isLessThan(Long.MAX_VALUE);
        testHarness.processWatermark(new Watermark(window.maxTimestamp()));
        expected.add(new Watermark(9223372036854774307L));
        expected.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, window.getStart(), window.getEnd(), window.maxTimestamp()));
        expected.add(new Watermark(window.maxTimestamp()));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testTumblingCountWindow() throws Exception {
        if (!UTC_ZONE_ID.equals(this.shiftTimeZone)) {
            return;
        }
        closeCalled.set(0);
        int windowSize = 3;
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType()};
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).countWindow(3L).aggregateAndBuild(this.getCountWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        testHarness.processWatermark(new Watermark(12000L));
        testHarness.setProcessingTime(12000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 6L, 3L, 0L)));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 5L, 3L, 0L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, 5501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 6, 6050L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 14L, 3L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, 4000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 20, 15000L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 36L, 3L, 2L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 2500L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 7L, 3L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testSlidingCountWindow() throws Exception {
        if (!UTC_ZONE_ID.equals(this.shiftTimeZone)) {
            return;
        }
        closeCalled.set(0);
        int windowSize = 5;
        int windowSlide = 3;
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType()};
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).countWindow(5L, 3L).aggregateAndBuild(this.getCountWindowAggFunction(), (RecordEqualiser)this.equaliser, this.accTypes, this.aggResultTypes, windowTypes);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, 0L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2, 1000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 4, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 5, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, 10L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, 1000L));
        testHarness.processWatermark(new Watermark(12000L));
        testHarness.setProcessingTime(12000L);
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 15L, 5L, 0L)));
        expectedOutput.add(new Watermark(12000L));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshotV2 = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshotV2);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 3, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4, 2500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5, 2500L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 15L, 5L, 0L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 6, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 7, 6000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 8, 6050L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 9, 6050L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 30L, 5L, 1L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6, 4000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 7, 4000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 8, 4000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 10, 15000L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 15000L));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key1", 30L, 5L, 1L)));
        expectedOutput.addAll(this.doubleRecord(this.isTableAggregate, StreamRecordUtils.insertRecord("key2", 45L, 5L, 2L)));
        this.assertor.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)closeCalled.get()).as("Close was not called.", new Object[0])).isEqualTo(2);
    }

    @TestTemplate
    void testWindowCloseWithoutOpen() throws Exception {
        if (!UTC_ZONE_ID.equals(this.shiftTimeZone)) {
            return;
        }
        int windowSize = 3;
        LogicalType[] windowTypes = new LogicalType[]{new BigIntType()};
        WindowOperator operator = WindowOperatorBuilder.builder().withInputFields(this.inputFieldTypes).withShiftTimezone(this.shiftTimeZone).countWindow(3L).aggregate(new GeneratedNamespaceTableAggsHandleFunction("MockClass", "", new Object[0]), this.accTypes, this.aggResultTypes, windowTypes).build();
        operator.close();
    }

    private long localMills(long epochMills) {
        return TimeWindowUtil.toUtcTimestampMills((long)epochMills, (ZoneId)this.shiftTimeZone);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowOperator operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)this.keySelector, this.keyType);
    }

    private static class GenericRowEqualiser
    implements RecordEqualiser {
        private final LogicalType[] fieldTypes;

        GenericRowEqualiser(LogicalType[] aggResultTypes, LogicalType[] windowTypes) {
            int size = aggResultTypes.length + windowTypes.length;
            this.fieldTypes = new LogicalType[size];
            for (int i = 0; i < size; ++i) {
                this.fieldTypes[i] = i < aggResultTypes.length ? aggResultTypes[i] : windowTypes[i - aggResultTypes.length];
            }
        }

        public boolean equals(RowData row1, RowData row2) {
            GenericRowData left = RowDataTestUtil.toGenericRowDeeply(row1, this.fieldTypes);
            GenericRowData right = RowDataTestUtil.toGenericRowDeeply(row2, this.fieldTypes);
            return left.equals((Object)right);
        }
    }

    private static class SumAndCountTableAggTimeWindow
    extends SumAndCountAggBase<TimeWindow>
    implements NamespaceTableAggsHandleFunction<TimeWindow> {
        private static final long serialVersionUID = 2062031590687738047L;

        private SumAndCountTableAggTimeWindow() {
        }

        public void emitValue(TimeWindow namespace, RowData key, Collector<RowData> out) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(5);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getStart());
            row.setField(3, (Object)namespace.getEnd());
            row.setField(4, (Object)namespace.maxTimestamp());
            this.result.replace(key, (RowData)row);
            out.collect((Object)this.result);
            out.collect((Object)this.result);
        }
    }

    private static class SumAndCountAggTimeWindow
    extends SumAndCountAggBase<TimeWindow>
    implements NamespaceAggsHandleFunction<TimeWindow> {
        private static final long serialVersionUID = 2062031590687738047L;

        private SumAndCountAggTimeWindow() {
        }

        public RowData getValue(TimeWindow namespace) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(5);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getStart());
            row.setField(3, (Object)namespace.getEnd());
            row.setField(4, (Object)namespace.maxTimestamp());
            return row;
        }
    }

    private static class SumAndCountTableAggCountWindow
    extends SumAndCountAggBase<CountWindow>
    implements NamespaceTableAggsHandleFunction<CountWindow> {
        private static final long serialVersionUID = -2634639678371135643L;

        private SumAndCountTableAggCountWindow() {
        }

        public void emitValue(CountWindow namespace, RowData key, Collector<RowData> out) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(3);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getId());
            this.result.replace(key, (RowData)row);
            out.collect((Object)this.result);
            out.collect((Object)this.result);
        }
    }

    private static class SumAndCountAggCountWindow
    extends SumAndCountAggBase<CountWindow>
    implements NamespaceAggsHandleFunction<CountWindow> {
        private static final long serialVersionUID = -2634639678371135643L;

        private SumAndCountAggCountWindow() {
        }

        public RowData getValue(CountWindow namespace) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(3);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(2, (Object)namespace.getId());
            return row;
        }
    }

    private static class PointSessionWindowAssigner
    extends SessionWindowAssigner {
        private static final long serialVersionUID = 1L;
        private final long sessionTimeout;

        private PointSessionWindowAssigner(long sessionTimeout) {
            super(sessionTimeout, true);
            this.sessionTimeout = sessionTimeout;
        }

        private PointSessionWindowAssigner(long sessionTimeout, boolean isEventTime) {
            super(sessionTimeout, isEventTime);
            this.sessionTimeout = sessionTimeout;
        }

        public Collection<TimeWindow> assignWindows(RowData element, long timestamp) {
            int second = element.getInt(1);
            if (second == 33) {
                return Collections.singletonList(new TimeWindow(timestamp, timestamp));
            }
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + this.sessionTimeout));
        }

        public SessionWindowAssigner withEventTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, true);
        }

        public SessionWindowAssigner withProcessingTime() {
            return new PointSessionWindowAssigner(this.sessionTimeout, false);
        }
    }

    private static class SumAndCountAggBase<W extends Window> {
        boolean openCalled;
        long sum;
        boolean sumIsNull;
        long count;
        boolean countIsNull;
        protected transient JoinedRowData result;

        private SumAndCountAggBase() {
        }

        public void open(StateDataViewStore store) throws Exception {
            this.openCalled = true;
            this.result = new JoinedRowData();
        }

        public void setAccumulators(W namespace, RowData acc) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            this.sumIsNull = acc.isNullAt(0);
            if (!this.sumIsNull) {
                this.sum = acc.getLong(0);
            }
            this.countIsNull = acc.isNullAt(1);
            if (!this.countIsNull) {
                this.count = acc.getLong(1);
            }
        }

        public void accumulate(RowData inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum += (long)inputRow.getInt(1);
                ++this.count;
            }
        }

        public void retract(RowData inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum -= (long)inputRow.getInt(1);
                --this.count;
            }
        }

        public void merge(W w, RowData otherAcc) throws Exception {
            boolean countIsNull2;
            boolean sumIsNull2;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(sumIsNull2 = otherAcc.isNullAt(0))) {
                this.sum += otherAcc.getLong(0);
            }
            if (!(countIsNull2 = otherAcc.isNullAt(1))) {
                this.count += otherAcc.getLong(1);
            }
        }

        public RowData createAccumulators() {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData acc = new GenericRowData(2);
            acc.setField(0, (Object)0L);
            acc.setField(1, (Object)0L);
            return acc;
        }

        public RowData getAccumulators() throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(2);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            return row;
        }

        public void cleanup(W window) {
        }

        public void close() {
            closeCalled.incrementAndGet();
        }
    }
}

