/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.harness;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.harness.HarnessTestBase;
import org.apache.flink.table.planner.runtime.harness.WindowAggregateHarnessTest$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\t%b\u0001B\u0010!\u0001=B\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!\u000e\u0005\t\u001f\u0002\u0011\t\u0011)A\u0005!\")\u0001\f\u0001C\u00013\"9Q\f\u0001b\u0001\n\u0013q\u0006BB0\u0001A\u0003%\u0001\u000bC\u0003a\u0001\u0011\u0005\u0013\rC\u0003t\u0001\u0011\u0005\u0011\rC\u0003y\u0001\u0011\u0005\u0011\rC\u0003{\u0001\u0011\u0005\u0011\rC\u0003}\u0001\u0011\u0005\u0011\rC\u0003\u007f\u0001\u0011\u0005\u0011\r\u0003\u0004\u0002\u0002\u0001!\t!\u0019\u0005\u0007\u0003\u000b\u0001A\u0011A1\t\r\u0005%\u0001\u0001\"\u0001b\u0011\u0019\ti\u0001\u0001C\u0001C\"9\u0011\u0011\u0003\u0001\u0005\n\u0005M\u0001bBA6\u0001\u0011%\u0011Q\u000e\u0005\b\u0003k\u0002A\u0011BA<\u0011\u001d\t9\n\u0001C\u0005\u00033Cq!a+\u0001\t\u0013\tikB\u0004\u0002d\u0002B\t!!:\u0007\r}\u0001\u0003\u0012AAt\u0011\u0019Af\u0003\"\u0001\u0002p\"9\u0011\u0011\u001f\f\u0005\u0002\u0005M\b\"\u0003B\u000e-\t\u0007I\u0011\u0001B\u000f\u0011!\u0011yB\u0006Q\u0001\n\u0005E\u0003\"\u0003B\u0011-\t\u0007I\u0011\u0001B\u000f\u0011!\u0011\u0019C\u0006Q\u0001\n\u0005E\u0003\"\u0003B\u0013-\t\u0007I\u0011\u0001B\u000f\u0011!\u00119C\u0006Q\u0001\n\u0005E#AG,j]\u0012|w/Q4he\u0016<\u0017\r^3ICJtWm]:UKN$(BA\u0011#\u0003\u001dA\u0017M\u001d8fgNT!a\t\u0013\u0002\u000fI,h\u000e^5nK*\u0011QEJ\u0001\ba2\fgN\\3s\u0015\t9\u0003&A\u0003uC\ndWM\u0003\u0002*U\u0005)a\r\\5oW*\u00111\u0006L\u0001\u0007CB\f7\r[3\u000b\u00035\n1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\u0019\u0011\u0005E\u0012T\"\u0001\u0011\n\u0005M\u0002#a\u0004%be:,7o\u001d+fgR\u0014\u0015m]3\u0002\u000f\t\f7m[3oIB\u0011a\u0007\u0014\b\u0003o%s!\u0001O$\u000f\u0005e2eB\u0001\u001eF\u001d\tYDI\u0004\u0002=\u0007:\u0011QH\u0011\b\u0003}\u0005k\u0011a\u0010\u0006\u0003\u0001:\na\u0001\u0010:p_Rt\u0014\"A\u0017\n\u0005-b\u0013BA\u0015+\u0013\t9\u0003&\u0003\u0002&M%\u00111\u0005J\u0005\u0003\u0011\n\nQ!\u001e;jYNL!AS&\u00025M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\u000b\u0005!\u0013\u0013BA'O\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u0002K\u0017\u0006i1\u000f[5giRKW.\u001a.p]\u0016\u0004\"!\u0015,\u000e\u0003IS!a\u0015+\u0002\tQLW.\u001a\u0006\u0002+\u0006!!.\u0019<b\u0013\t9&K\u0001\u0004[_:,\u0017\nZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007i[F\f\u0005\u00022\u0001!)Ag\u0001a\u0001k!)qj\u0001a\u0001!\u0006YQ\u000bV\"`5>sUiX%E+\u0005\u0001\u0016\u0001D+U\u0007~SvJT#`\u0013\u0012\u0003\u0013A\u00022fM>\u0014X\rF\u0001c!\t\u0019g-D\u0001e\u0015\u0005)\u0017!B:dC2\f\u0017BA4e\u0005\u0011)f.\u001b;)\u0005\u0019I\u0007C\u00016r\u001b\u0005Y'B\u00017n\u0003\r\t\u0007/\u001b\u0006\u0003]>\fqA[;qSR,'O\u0003\u0002qY\u0005)!.\u001e8ji&\u0011!o\u001b\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017A\b;fgR\u0004&o\\2fgNLgn\u001a+j[\u0016$V/\u001c2mK^Kg\u000eZ8xQ\t9Q\u000f\u0005\u0002km&\u0011qo\u001b\u0002\r)\u0016\u001cH\u000fV3na2\fG/Z\u0001,i\u0016\u001cH\u000f\u0015:pG\u0016\u001c8/\u001b8h)&lW\rV;nE2,w+\u001b8e_^<\u0016\u000e\u001e5D\t\u000e\u001bv.\u001e:dK\"\u0012\u0001\"^\u0001\u001ci\u0016\u001cH\u000f\u0015:pG\u0016\u001c8/\u001b8h)&lW\rS8q/&tGm\\<)\u0005%)\u0018\u0001\u000b;fgR\u0004&o\\2fgNLgn\u001a+j[\u0016Du\u000e],j]\u0012|woV5uQ\u000e#5iU8ve\u000e,\u0007F\u0001\u0006v\u0003\u0001\"Xm\u001d;Qe>\u001cWm]:j]\u001e$\u0016.\\3Dk6,H.\u0019;f/&tGm\\<)\u0005-)\u0018!\f;fgR\u0004&o\\2fgNLgn\u001a+j[\u0016\u001cU/\\;mCR,w+\u001b8e_^<\u0016\u000e\u001e5D\t\u000e\u001bv.\u001e:dK\"\u0012A\"^\u0001\u0015i\u0016\u001cHo\u00117pg\u0016<\u0016\u000e\u001e5pkR|\u0005/\u001a8)\u00055)\u0018a\u000b;fgR$vo\u001c)iCN,w+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00117pg\u0016<\u0016\u000e\u001e5pkR|\u0005/\u001a8)\u00059)\u0018!\r;fgR\u0004&o\\2fgNLgn\u001a+j[\u0016$V/\u001c2mK^Kg\u000eZ8x/&$\bNR;ukJ,w+\u0019;fe6\f'o\u001b\u0015\u0003\u001fU\f!e\u0019:fCR,\u0007K]8dKN\u001c\u0018N\\4US6,w+\u001b8e_^|\u0005/\u001a:bi>\u0014HCBA\u000b\u0003\u001b\n\t\u0007E\u0004d\u0003/\tY\"a\u000e\n\u0007\u0005eAM\u0001\u0004UkBdWM\r\t\u000b\u0003;\t9#a\u000b\u0002,\u0005-RBAA\u0010\u0015\u0011\t\t#a\t\u0002\tU$\u0018\u000e\u001c\u0006\u0004\u0003KA\u0013!C:ue\u0016\fW.\u001b8h\u0013\u0011\tI#a\b\u0003M-+\u00170\u001a3P]\u0016Le\u000e];u'R\u0014X-Y7Pa\u0016\u0014\u0018\r^8s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u0003\u0002.\u0005MRBAA\u0018\u0015\r\t\tDJ\u0001\u0005I\u0006$\u0018-\u0003\u0003\u00026\u0005=\"a\u0002*po\u0012\u000bG/\u0019\t\u0006G\u0006e\u0012QH\u0005\u0004\u0003w!'!B!se\u0006L\b\u0003BA \u0003\u0013j!!!\u0011\u000b\t\u0005\r\u0013QI\u0001\bY><\u0017nY1m\u0015\r\t9EJ\u0001\u0006if\u0004Xm]\u0005\u0005\u0003\u0017\n\tEA\u0006M_\u001eL7-\u00197UsB,\u0007bBA(!\u0001\u0007\u0011\u0011K\u0001\u000bi\u0016\u001cHoV5oI><\b\u0003BA*\u00037rA!!\u0016\u0002XA\u0011a\bZ\u0005\u0004\u00033\"\u0017A\u0002)sK\u0012,g-\u0003\u0003\u0002^\u0005}#AB*ue&twMC\u0002\u0002Z\u0011Dq!a\u0019\u0011\u0001\u0004\t)'A\u0006jg\u000e#5iU8ve\u000e,\u0007cA2\u0002h%\u0019\u0011\u0011\u000e3\u0003\u000f\t{w\u000e\\3b]\u0006Q\u0011N\\4fgR$\u0015\r^1\u0015\u000b\t\fy'a\u001d\t\u000f\u0005E\u0014\u00031\u0001\u0002\u001c\u0005YA/Z:u\u0011\u0006\u0014h.Z:t\u0011\u001d\t\u0019'\u0005a\u0001\u0003K\nA\"\u001b8tKJ$(+Z2pe\u0012$B!!\u001f\u0002\bB1\u00111PAB\u0003Wi!!! \u000b\t\u0005}\u0014\u0011Q\u0001\rgR\u0014X-Y7sK\u000e|'\u000f\u001a\u0006\u0004G\u0005\r\u0012\u0002BAC\u0003{\u0012Ab\u0015;sK\u0006l'+Z2pe\u0012Dq!!#\u0013\u0001\u0004\tY)\u0001\u0003be\u001e\u001c\b#B2\u0002\u000e\u0006E\u0015bAAHI\nQAH]3qK\u0006$X\r\u001a \u0011\u0007\r\f\u0019*C\u0002\u0002\u0016\u0012\u00141!\u00118z\u0003=\u0019\u0007.\u00198hK2|wMU3d_J$GCBA=\u00037\u000bI\u000bC\u0004\u0002\u001eN\u0001\r!a(\u0002\u000fI|woS5oIB!\u0011\u0011UAS\u001b\t\t\u0019KC\u0002\u0002H!JA!a*\u0002$\n9!k\\<LS:$\u0007bBAE'\u0001\u0007\u00111R\u0001\u000bY>\u001c\u0017\r\\'jY2\u001cH\u0003BAX\u0003k\u0003B!!\f\u00022&!\u00111WA\u0018\u00055!\u0016.\\3ti\u0006l\u0007\u000fR1uC\"9\u0011q\u0017\u000bA\u0002\u0005E\u0013\u0001\u00033bi\u0016$\u0016.\\3)\u000f\u0001\tY,a2\u0002JB!\u0011QXAb\u001b\t\tyLC\u0002\u0002B.\f\u0011\"\u001a=uK:\u001c\u0018n\u001c8\n\t\u0005\u0015\u0017q\u0018\u0002\u000b\u000bb$XM\u001c3XSRD\u0017!\u0002<bYV,GFAAfG\t\ti\r\u0005\u0003\u0002P\u0006}WBAAi\u0015\u0011\t\u0019.!6\u0002\u001bA\f'/Y7fi\u0016\u0014\u0018N_3e\u0015\u0011\t9.!7\u0002\u0015\u0015DH/\u001a8tS>t7OC\u0002q\u00037T1!!8)\u0003%!Xm\u001d;vi&d7/\u0003\u0003\u0002b\u0006E'A\u0007)be\u0006lW\r^3sSj,G\rV3ti\u0016CH/\u001a8tS>t\u0017AG,j]\u0012|w/Q4he\u0016<\u0017\r^3ICJtWm]:UKN$\bCA\u0019\u0017'\r1\u0012\u0011\u001e\t\u0004G\u0006-\u0018bAAwI\n1\u0011I\\=SK\u001a$\"!!:\u0002\u0015A\f'/Y7fi\u0016\u00148\u000f\u0006\u0002\u0002vB1\u0011q_A~\u0003\u007fl!!!?\u000b\u0007\u0005\u0005B+\u0003\u0003\u0002~\u0006e(AC\"pY2,7\r^5p]B)1-!\u000f\u0003\u0002A!!1\u0001B\u0005\u001b\t\u0011)AC\u0002\u0003\bQ\u000bA\u0001\\1oO&!!1\u0002B\u0003\u0005\u0019y%M[3di\":\u0001Da\u0004\u0003\u0016\t]\u0001\u0003BAh\u0005#IAAa\u0005\u0002R\nQ\u0001+\u0019:b[\u0016$XM]:\u0002\t9\fW.Z\u0011\u0003\u00053\tad\u0015;bi\u0016\u0014\u0015mY6f]\u0012l4\u0010M?-AQKW.\u001a.p]\u0016l40M?\u0002\rQ+VJ\u0011'F+\t\t\t&A\u0004U+6\u0013E*\u0012\u0011\u0002\u0007!{\u0005+\u0001\u0003I\u001fB\u0003\u0013\u0001C\"V\u001bVc\u0015\tV#\u0002\u0013\r+V*\u0016'B)\u0016\u0003\u0003")
public class WindowAggregateHarnessTest
extends HarnessTestBase {
    private final ZoneId shiftTimeZone;
    private final ZoneId UTC_ZONE_ID;

    public static String CUMULATE() {
        return WindowAggregateHarnessTest$.MODULE$.CUMULATE();
    }

    public static String HOP() {
        return WindowAggregateHarnessTest$.MODULE$.HOP();
    }

    public static String TUMBLE() {
        return WindowAggregateHarnessTest$.MODULE$.TUMBLE();
    }

    @Parameters(name="StateBackend={0}, TimeZone={1}")
    public static Collection<Object[]> parameters() {
        return WindowAggregateHarnessTest$.MODULE$.parameters();
    }

    private ZoneId UTC_ZONE_ID() {
        return this.UTC_ZONE_ID;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.tEnv().getConfig().setLocalTimeZone(this.shiftTimeZone);
        String insertOnlyDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(551).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | proctime AS PROCTIME()\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(insertOnlyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String changelogDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowChangelogDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(612).append("\n                       |CREATE TABLE T1_CDC (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | proctime AS PROCTIME()\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(changelogDataId).append("',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, false);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), true);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, true);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testProcessingTimeHopWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.HOP(), false);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, false);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1969-12-31T23:59:55"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:10"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:25"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:25"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testProcessingTimeHopWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.HOP(), true);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, true);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1969-12-31T23:59:55"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), null, BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:10"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testProcessingTimeCumulateWindow() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.CUMULATE(), false);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, false);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)5L), BoxesRunTime.boxToDouble((double)5.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:30")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:45")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:35")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:40")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)7.0), BoxesRunTime.boxToLong((long)0L), this.localMills("1970-01-01T00:00:30"), this.localMills("1970-01-01T00:00:45")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testProcessingTimeCumulateWindowWithCDCSource() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.CUMULATE(), true);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        this.ingestData((KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>)testHarness2, true);
        ConcurrentLinkedQueue<StreamRecord<RowData>> expected = new ConcurrentLinkedQueue<StreamRecord<RowData>>();
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)22.0), BoxesRunTime.boxToLong((long)3L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:10")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)6.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:15")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:20")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:25")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:15"), this.localMills("1970-01-01T00:00:30")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    @TestTemplate
    public void testCloseWithoutOpen() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        testHarness2.setup((TypeSerializer)new RowDataSerializer(outputTypes2));
        testHarness2.close();
    }

    @TestTemplate
    public void testTwoPhaseWindowAggregateCloseWithoutOpen() {
        String timestampDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(723).append("\n                       |CREATE TABLE T2 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS\n                       | TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(timestampDataId).append("',\n                       | 'failing-source' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.TWO_PHASE);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  MAX(`double`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        Table t1 = this.tEnv().sqlQuery(sql);
        DataStream stream = package$.MODULE$.tableConversions(t1).toDataStream();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarnessTesterForNoState(stream, "LocalWindowAggregate");
        LogicalType[] outputTypes = (LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()});
        testHarness.setup((TypeSerializer)new RowDataSerializer(outputTypes));
        testHarness.close();
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness1 = this.createHarnessTester(stream, "GlobalWindowAggregate");
        testHarness1.setup((TypeSerializer)new RowDataSerializer(outputTypes));
        testHarness1.close();
    }

    @TestTemplate
    public void testProcessingTimeTumbleWindowWithFutureWatermark() {
        Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> tuple2 = this.createProcessingTimeWindowOperator(WindowAggregateHarnessTest$.MODULE$.TUMBLE(), false);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        KeyedOneInputStreamOperatorTestHarness testHarness = (KeyedOneInputStreamOperatorTestHarness)tuple2._1();
        LogicalType[] outputTypes = (LogicalType[])tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)testHarness, (Object)outputTypes);
        KeyedOneInputStreamOperatorTestHarness testHarness2 = (KeyedOneInputStreamOperatorTestHarness)tuple22._1();
        LogicalType[] outputTypes2 = (LogicalType[])tuple22._2();
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputTypes2);
        testHarness2.open();
        testHarness2.processWatermark(10000L);
        testHarness2.setProcessingTime(1000L);
        testHarness2.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)1.0), "str1", null})));
        testHarness2.setProcessingTime(2000L);
        testHarness2.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "str2", null})));
        testHarness2.setProcessingTime(3000L);
        testHarness2.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)3.0), "str2", null})));
        testHarness2.setProcessingTime(6000L);
        testHarness2.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)4.0), "str1", null})));
        testHarness2.setProcessingTime(50000L);
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        expected.add(new Watermark(10000L));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)3.0), BoxesRunTime.boxToLong((long)2L), this.localMills("1970-01-01T00:00:00"), this.localMills("1970-01-01T00:00:05")})));
        expected.add(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)4.0), BoxesRunTime.boxToLong((long)1L), this.localMills("1970-01-01T00:00:05"), this.localMills("1970-01-01T00:00:10")})));
        assertor.assertOutputEqualsSorted("result mismatch", expected, (Collection)testHarness2.getOutput());
        testHarness2.close();
    }

    private Tuple2<KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>, LogicalType[]> createProcessingTimeWindowOperator(String testWindow, boolean isCDCSource) {
        String string;
        String string2 = testWindow;
        String string3 = WindowAggregateHarnessTest$.MODULE$.TUMBLE();
        String string4 = string2;
        if (!(string3 != null ? !string3.equals(string4) : string4 != null)) {
            string = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(138).append("\n           |TUMBLE(\n           |      TABLE ").append((Object)(isCDCSource ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND)\n           |").toString())).stripMargin();
        } else {
            String string5 = WindowAggregateHarnessTest$.MODULE$.HOP();
            String string6 = string2;
            if (!(string5 != null ? !string5.equals(string6) : string6 != null)) {
                string = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(175).append("\n           |HOP(\n           |      TABLE ").append((Object)(isCDCSource ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND,\n           |      INTERVAL '10' SECOND)\n           |").toString())).stripMargin();
            } else {
                String string7 = WindowAggregateHarnessTest$.MODULE$.CUMULATE();
                String string8 = string2;
                if (!(string7 != null ? !string7.equals(string8) : string8 != null)) {
                    string = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(180).append("\n           |CUMULATE(\n           |      TABLE ").append((Object)(isCDCSource ? "T1_CDC" : "T1")).append(",\n           |      DESCRIPTOR(proctime),\n           |      INTERVAL '5' SECOND,\n           |      INTERVAL '15' SECOND)\n           |").toString())).stripMargin();
                } else {
                    throw new MatchError((Object)string2);
                }
            }
        }
        String windowDDL = string;
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(255).append("\n         |SELECT\n         |  `name`,\n         |  window_start,\n         |  window_end,\n         |  COUNT(*),\n         |  MAX(`double`),\n         |  COUNT(DISTINCT `string`)\n         |FROM TABLE(").append(windowDDL).append(")\n         |GROUP BY `name`, window_start, window_end\n      ").toString())).stripMargin();
        Table table = this.tEnv().sqlQuery(sql);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarnessTester(package$.MODULE$.tableConversions(table).toDataStream(), "WindowAggregate");
        LogicalType[] outputTypes = (LogicalType[])((Object[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.DOUBLE().getLogicalType(), DataTypes.BIGINT().getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType(), DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType()});
        return new Tuple2(testHarness, (Object)outputTypes);
    }

    private void ingestData(KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness, boolean isCDCSource) {
        if (isCDCSource) {
            testHarness.setProcessingTime(1000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)1.0), "Hi", null})));
            testHarness.setProcessingTime(2000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
            testHarness.setProcessingTime(3000L);
            testHarness.processElement(this.changelogRecord(RowKind.DELETE, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)1.0), "Hi", null})));
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
            testHarness.setProcessingTime(4000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)5.0), null, null})));
            testHarness.processElement(this.changelogRecord(RowKind.UPDATE_BEFORE, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
            testHarness.processElement(this.changelogRecord(RowKind.UPDATE_AFTER, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)22.0), "Comment#22", null})));
            testHarness.setProcessingTime(6000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)6.0), "Hi", null})));
            testHarness.setProcessingTime(7000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)3.0), "Hello", null})));
            testHarness.setProcessingTime(8000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", null, "Comment#2", null})));
            testHarness.setProcessingTime(16000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)4.0), "Hi", null})));
            testHarness.setProcessingTime(38000L);
            testHarness.processElement(this.changelogRecord(RowKind.INSERT, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)8.0), "Comment#4", null})));
            testHarness.setProcessingTime(39000L);
            testHarness.processElement(this.changelogRecord(RowKind.DELETE, (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)8.0), "Comment#4", null})));
            return;
        }
        testHarness.setProcessingTime(1000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)1.0), "Hi", null})));
        testHarness.setProcessingTime(2000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
        testHarness.setProcessingTime(3000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)2.0), "Comment#1", null})));
        testHarness.setProcessingTime(4000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", BoxesRunTime.boxToDouble((double)5.0), null, null})));
        testHarness.setProcessingTime(6000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)6.0), "Hi", null})));
        testHarness.setProcessingTime(7000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)3.0), "Hello", null})));
        testHarness.setProcessingTime(8000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"a", null, "Comment#2", null})));
        testHarness.setProcessingTime(16000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)4.0), "Hi", null})));
        testHarness.setProcessingTime(32000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{null, BoxesRunTime.boxToDouble((double)7.0), null, null})));
        testHarness.setProcessingTime(34000L);
        testHarness.processElement(this.insertRecord((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"b", BoxesRunTime.boxToDouble((double)3.0), "Comment#3", null})));
        testHarness.setProcessingTime(50000L);
    }

    private StreamRecord<RowData> insertRecord(Seq<Object> args) {
        Object[] objs = (Object[])((TraversableOnce)args.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Object object = x0$1;
            if (object instanceof Long) {
                long l = BoxesRunTime.unboxToLong((Object)object);
                return BoxesRunTime.boxToLong((long)l);
            }
            if (object instanceof Double) {
                double d = BoxesRunTime.unboxToDouble((Object)object);
                return BoxesRunTime.boxToDouble((double)d);
            }
            return object;
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Object());
        return StreamRecordUtils.binaryRecord((RowKind)RowKind.INSERT, (Object[])objs);
    }

    private StreamRecord<RowData> changelogRecord(RowKind rowKind, Seq<Object> args) {
        Object[] objs = (Object[])((TraversableOnce)args.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Object object = x0$1;
            if (object instanceof Long) {
                long l = BoxesRunTime.unboxToLong((Object)object);
                return BoxesRunTime.boxToLong((long)l);
            }
            if (object instanceof Double) {
                double d = BoxesRunTime.unboxToDouble((Object)object);
                return BoxesRunTime.boxToDouble((double)d);
            }
            return object;
        }, Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Object());
        return StreamRecordUtils.binaryRecord((RowKind)rowKind, (Object[])objs);
    }

    private TimestampData localMills(String dateTime) {
        ZonedDateTime windowDateTime = LocalDateTime.parse(dateTime).atZone(this.UTC_ZONE_ID());
        return TimestampData.fromEpochMillis((long)TimeWindowUtil.toUtcTimestampMills((long)windowDateTime.toInstant().toEpochMilli(), (ZoneId)this.shiftTimeZone));
    }

    public WindowAggregateHarnessTest(StreamingWithStateTestBase.StateBackendMode backend, ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
        super(backend);
        this.UTC_ZONE_ID = ZoneId.of("UTC");
    }
}

