/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.NonPojoType;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorFactory;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.test.util.MigrationTest;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class WindowOperatorMigrationTest
implements MigrationTest {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){});
    private final FlinkVersion testMigrateVersion;

    @Parameters(name="Migration Savepoint: {0}")
    public static Collection<FlinkVersion> parameters() {
        return FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_8, (FlinkVersion)MigrationTest.getMostRecentlyPublishedVersion());
    }

    public WindowOperatorMigrationTest(FlinkVersion testMigrateVersion) {
        this.testMigrateVersion = testMigrateVersion;
    }

    @MigrationTest.SnapshotsGenerator
    public void writeSessionWindowsWithCountTriggerSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)EventTimeSessionWindows.withGap((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreSessionWindowsWithCountTrigger() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)EventTimeSessionWindows.withGap((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-session-with-stateful-trigger-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 6000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 6500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7000L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), (Comparator)new Tuple3ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)10), 4500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-22", (Object)10L, (Object)10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), (Comparator)new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)EventTimeSessionWindows.withGap((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception {
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)EventTimeSessionWindows.withGap((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-session-with-stateful-trigger-mint-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 6000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 6500L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-10", (Object)0L, (Object)6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), (Comparator)new Tuple3ResultSortComparator());
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)10), 4500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-22", (Object)10L, (Object)10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), (Comparator)new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeReducingEventTimeWindowsSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Watermark> expectedOutput = new ConcurrentLinkedQueue<Watermark>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-reduce-event-time-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreReducingEventTimeWindows() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-reduce-event-time-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeApplyEventTimeWindowsSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Watermark> expectedOutput = new ConcurrentLinkedQueue<Watermark>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-apply-event-time-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreApplyEventTimeWindows() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-apply-event-time-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeReducingProcessingTimeWindowsSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(10L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.setProcessingTime(3010L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-reduce-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreReducingProcessingTimeWindows() throws Exception {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-reduce-processing-time-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.setProcessingTime(3020L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3)));
        testHarness.setProcessingTime(6000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeApplyProcessingTimeWindowsSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(10L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1)));
        testHarness.setProcessingTime(3010L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1)));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-apply-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreApplyProcessingTimeWindows() throws Exception {
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-apply-processing-time-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.setProcessingTime(3020L);
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3)));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3)));
        testHarness.setProcessingTime(6000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key3", (Object)1), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @MigrationTest.SnapshotsGenerator
    public void writeWindowsWithKryoSerializedKeysSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        int windowSize = 3;
        TypeInformation inputType = new TypeHint<Tuple2<NonPojoType, Integer>>(){}.getTypeInfo();
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), inputType.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        TypeSerializer keySerializer = TypeInformation.of(NonPojoType.class).createSerializer((SerializerConfig)new SerializerConfigImpl());
        Assertions.assertThat((Object)keySerializer).isInstanceOf(KryoSerializer.class);
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), keySerializer, (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Watermark> expectedOutput = new ConcurrentLinkedQueue<Watermark>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 3999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 3000L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 20L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 0L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)1), 999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1998L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1999L));
        testHarness.processElement(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/win-op-migration-test-kryo-serialized-key-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        testHarness.close();
    }

    @TestTemplate
    void testRestoreKryoSerializedKeysWindows() throws Exception {
        int windowSize = 3;
        TypeInformation inputType = new TypeHint<Tuple2<NonPojoType, Integer>>(){}.getTypeInfo();
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", new SumReducer(), inputType.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        TypeSerializer keySerializer = TypeInformation.of(NonPojoType.class).createSerializer((SerializerConfig)new SerializerConfigImpl());
        Assertions.assertThat((Object)keySerializer).isInstanceOf(KryoSerializer.class);
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(3L)), (TypeSerializer)new TimeWindow.Serializer(), new TupleKeySelector(), keySerializer, (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, new TupleKeySelector(), TypeInformation.of(NonPojoType.class));
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("win-op-migration-test-kryo-serialized-key-flink" + this.testMigrateVersion + "-snapshot")));
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key1"), (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)new NonPojoType("key2"), (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted((String)"Output was not correct.", expectedOutput, (Iterable)testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    private static class TupleKeySelector<K>
    implements KeySelector<Tuple2<K, Integer>, K> {
        private static final long serialVersionUID = 1L;

        private TupleKeySelector() {
        }

        public K getKey(Tuple2<K, Integer> value) throws Exception {
            return (K)value.f0;
        }
    }

    private static class SessionWindowFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private SessionWindowFunction() {
        }

        public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            int sum = 0;
            for (Tuple2<String, Integer> i : values) {
                sum += ((Integer)i.f1).intValue();
            }
            String resultString = key + "-" + sum;
            out.collect((Object)new Tuple3((Object)resultString, (Object)window.getStart(), (Object)window.getEnd()));
        }
    }

    private static class Tuple3ResultSortComparator
    implements Comparator<Object> {
        private Tuple3ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple3)sr0.getValue()).f0).compareTo((String)((Tuple3)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            comparison = (int)((Long)((Tuple3)sr0.getValue()).f1 - (Long)((Tuple3)sr1.getValue()).f1);
            if (comparison != 0) {
                return comparison;
            }
            return (int)((Long)((Tuple3)sr0.getValue()).f1 - (Long)((Tuple3)sr1.getValue()).f1);
        }
    }

    private static class SumReducer<K>
    implements ReduceFunction<Tuple2<K, Integer>> {
        private static final long serialVersionUID = 1L;

        private SumReducer() {
        }

        public Tuple2<K, Integer> reduce(Tuple2<K, Integer> value1, Tuple2<K, Integer> value2) throws Exception {
            return new Tuple2(value2.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }

    private static class Tuple2ResultSortComparator<K extends Comparable>
    implements Comparator<Object> {
        private Tuple2ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Comparable)((Tuple2)sr0.getValue()).f0).compareTo(((Tuple2)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)((Tuple2)sr0.getValue()).f1 - (Integer)((Tuple2)sr1.getValue()).f1;
        }
    }

    private static class RichSumReducer<W extends Window>
    extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1L;
        private boolean openCalled = false;

        private RichSumReducer() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
        }

        public void apply(String key, W window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.openCalled).as("Open was not called", new Object[0])).isTrue();
            int sum = 0;
            for (Tuple2<String, Integer> t : input) {
                sum += ((Integer)t.f1).intValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }
}

