/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Collection;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase$;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.assertj.core.api.AbstractAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\t=h\u0001B#G\u0001]C\u0001B\u0018\u0001\u0003\u0002\u0003\u0006Ia\u0018\u0005\tO\u0002\u0011\t\u0011)A\u0005Q\"Q\u0011\u0011\u0001\u0001\u0003\u0002\u0003\u0006I!a\u0001\t\u0015\u0005=\u0001A!A!\u0002\u0013\t\u0019\u0001C\u0004\u0002\u0012\u0001!\t!a\u0005\t\u0013\u0005\u0005\u0002A1A\u0005\u0002\u0005\r\u0002\u0002CA!\u0001\u0001\u0006I!!\n\t\u0013\u0005\r\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA#\u0001\u0001\u0006I!!\n\t\u0013\u0005\u001d\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA%\u0001\u0001\u0006I!!\n\t\u0013\u0005-\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA'\u0001\u0001\u0006I!!\n\t\u0013\u0005=\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA)\u0001\u0001\u0006I!!\n\t\u0013\u0005M\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA+\u0001\u0001\u0006I!!\n\t\u0013\u0005]\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA-\u0001\u0001\u0006I!!\n\t\u0013\u0005m\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA/\u0001\u0001\u0006I!!\n\t\u0013\u0005}\u0003A1A\u0005\u0002\u0005\r\u0002\u0002CA1\u0001\u0001\u0006I!!\n\t\u0013\u0005\r\u0004A1A\u0005\u0002\u0005\u0015\u0004\u0002CA:\u0001\u0001\u0006I!a\u001a\t\u000f\u0005U\u0004\u0001\"\u0011\u0002x!9\u00111\u0013\u0001\u0005\u0002\u0005]\u0004bBAO\u0001\u0011\u0005\u0011q\u000f\u0005\b\u0003C\u0003A\u0011AA<\u0011\u001d\t)\u000b\u0001C\u0001\u0003oBq!!+\u0001\t\u0003\t9\bC\u0004\u0002.\u0002!\t!a\u001e\t\u000f\u0005E\u0006\u0001\"\u0001\u0002x!9\u0011Q\u0017\u0001\u0005\u0002\u0005]\u0004bBA]\u0001\u0011\u0005\u0011q\u000f\u0005\b\u0003{\u0003A\u0011AA<\u0011\u001d\t\t\r\u0001C\u0001\u0003oBq!!2\u0001\t\u0003\t9\bC\u0004\u0002J\u0002!\t!a\u001e\t\u000f\u00055\u0007\u0001\"\u0001\u0002x!9\u0011\u0011\u001b\u0001\u0005\u0002\u0005]\u0004bBAk\u0001\u0011\u0005\u0011q\u000f\u0005\b\u00033\u0004A\u0011AA<\u0011\u001d\ti\u000e\u0001C\u0001\u0003oBq!!9\u0001\t\u0003\t9\bC\u0004\u0002f\u0002!\t!a\u001e\t\u000f\u0005%\b\u0001\"\u0001\u0002x!9\u0011Q\u001e\u0001\u0005\u0002\u0005]\u0004bBAy\u0001\u0011\u0005\u0011q\u000f\u0005\b\u0003k\u0004A\u0011AA<\u0011\u001d\tI\u0010\u0001C\u0001\u0003oBq!!@\u0001\t\u0003\t9\bC\u0004\u0003\u0002\u0001!\t!a\u001e\t\u000f\t\u0015\u0001\u0001\"\u0001\u0002x!9!\u0011\u0002\u0001\u0005\u0002\u0005]\u0004b\u0002B\u0007\u0001\u0011\u0005\u0011q\u000f\u0005\b\u0005#\u0001A\u0011AA<\u0011\u001d\u0011)\u0002\u0001C\u0001\u0003oBqA!\u0007\u0001\t\u0003\t9\bC\u0004\u0003\u001e\u0001!IAa\b\t\u0013\t-\u0003!%A\u0005\n\t5\u0003b\u0002B2\u0001\u0011%!Q\r\u0005\b\u0005_\u0002A\u0011\u0002B9\u0011\u001d\u0011\u0019\t\u0001C\u0005\u0005\u000b;qAa.G\u0011\u0003\u0011IL\u0002\u0004F\r\"\u0005!1\u0018\u0005\b\u0003#\u0011E\u0011\u0001Bb\u0011\u001d\u0011)M\u0011C\u0001\u0005\u000f\u0014QcV5oI><\u0018iZ4sK\u001e\fG/Z%U\u0007\u0006\u001cXM\u0003\u0002H\u0011\u0006\u00191/\u001d7\u000b\u0005%S\u0015AB:ue\u0016\fWN\u0003\u0002L\u0019\u00069!/\u001e8uS6,'BA'O\u0003\u001d\u0001H.\u00198oKJT!a\u0014)\u0002\u000bQ\f'\r\\3\u000b\u0005E\u0013\u0016!\u00024mS:\\'BA*U\u0003\u0019\t\u0007/Y2iK*\tQ+A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u00011B\u0011\u0011\fX\u0007\u00025*\u00111LS\u0001\u0006kRLGn]\u0005\u0003;j\u0013!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016\f\u0001\"Y4h!\"\f7/\u001a\t\u0003A\u0016l\u0011!\u0019\u0006\u0003E\u000e\faaY8oM&<'B\u00013O\u0003\r\t\u0007/[\u0005\u0003M\u0006\u0014a#Q4he\u0016<\u0017\r^3QQ\u0006\u001cXm\u0015;sCR,w-_\u0001\u0006gR\fG/\u001a\t\u0003Svt!A[>\u000f\u0005-ThB\u00017z\u001d\ti\u0007P\u0004\u0002oo:\u0011qN\u001e\b\u0003aVt!!\u001d;\u000e\u0003IT!a\u001d,\u0002\rq\u0012xn\u001c;?\u0013\u0005)\u0016BA*U\u0013\t\t&+\u0003\u0002P!&\u0011QJT\u0005\u0003\u00172K!a\u0017&\n\u0005qT\u0016AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017B\u0001@\u0000\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u0002}5\u0006yQo]3US6,7\u000f^1na2#(\u0010\u0005\u0003\u0002\u0006\u0005-QBAA\u0004\u0015\t\tI!A\u0003tG\u0006d\u0017-\u0003\u0003\u0002\u000e\u0005\u001d!a\u0002\"p_2,\u0017M\\\u0001\u0011K:\f'\r\\3Bgft7m\u0015;bi\u0016\fa\u0001P5oSRtDCCA\u000b\u00033\tY\"!\b\u0002 A\u0019\u0011q\u0003\u0001\u000e\u0003\u0019CQAX\u0003A\u0002}CQaZ\u0003A\u0002!Dq!!\u0001\u0006\u0001\u0004\t\u0019\u0001C\u0004\u0002\u0010\u0015\u0001\r!a\u0001\u0002AQ+XN\u00197f/&tGm\\<He>,\boU3u\u000bb\u0004Xm\u0019;fI\u0012\u000bG/Y\u000b\u0003\u0003K\u0001b!a\n\u0002.\u0005ERBAA\u0015\u0015\u0011\tY#a\u0002\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u00020\u0005%\"aA*fcB!\u00111GA\u001f\u001b\t\t)D\u0003\u0003\u00028\u0005e\u0012\u0001\u00027b]\u001eT!!a\u000f\u0002\t)\fg/Y\u0005\u0005\u0003\u007f\t)D\u0001\u0004TiJLgnZ\u0001\")Vl'\r\\3XS:$wn^$s_V\u00048+\u001a;FqB,7\r^3e\t\u0006$\u0018\rI\u0001\u001d)Vl'\r\\3XS:$wn^\"vE\u0016,\u0005\u0010]3di\u0016$G)\u0019;b\u0003u!V/\u001c2mK^Kg\u000eZ8x\u0007V\u0014W-\u0012=qK\u000e$X\r\u001a#bi\u0006\u0004\u0013A\b+v[\ndWmV5oI><(k\u001c7mkB,\u0005\u0010]3di\u0016$G)\u0019;b\u0003}!V/\u001c2mK^Kg\u000eZ8x%>dG.\u001e9FqB,7\r^3e\t\u0006$\u0018\rI\u0001\u001e\u0011>\u0004x+\u001b8e_^<%o\\;q'\u0016$X\t\u001f9fGR,G\rR1uC\u0006q\u0002j\u001c9XS:$wn^$s_V\u00048+\u001a;FqB,7\r^3e\t\u0006$\u0018\rI\u0001\u001a\u0011>\u0004x+\u001b8e_^\u001cUOY3FqB,7\r^3e\t\u0006$\u0018-\u0001\u000eI_B<\u0016N\u001c3po\u000e+(-Z#ya\u0016\u001cG/\u001a3ECR\f\u0007%A\u000eI_B<\u0016N\u001c3poJ{G\u000e\\;q\u000bb\u0004Xm\u0019;fI\u0012\u000bG/Y\u0001\u001d\u0011>\u0004x+\u001b8e_^\u0014v\u000e\u001c7va\u0016C\b/Z2uK\u0012$\u0015\r^1!\u0003\t\u001aU/\\;mCR,w+\u001b8e_^<%o\\;q'\u0016$X\t\u001f9fGR,G\rR1uC\u0006\u00193)^7vY\u0006$XmV5oI><xI]8vaN+G/\u0012=qK\u000e$X\r\u001a#bi\u0006\u0004\u0013AH\"v[Vd\u0017\r^3XS:$wn^\"vE\u0016,\u0005\u0010]3di\u0016$G)\u0019;b\u0003}\u0019U/\\;mCR,w+\u001b8e_^\u001cUOY3FqB,7\r^3e\t\u0006$\u0018\rI\u0001!\u0007VlW\u000f\\1uK^Kg\u000eZ8x%>dG.\u001e9FqB,7\r^3e\t\u0006$\u0018-A\u0011Dk6,H.\u0019;f/&tGm\\<S_2dW\u000f]#ya\u0016\u001cG/\u001a3ECR\f\u0007%A\u0007T\u0011\u0006su\tS!J?j{e*R\u000b\u0003\u0003O\u0002B!!\u001b\u0002p5\u0011\u00111\u000e\u0006\u0005\u0003[\nI$\u0001\u0003uS6,\u0017\u0002BA9\u0003W\u0012aAW8oK&#\u0017AD*I\u0003:;\u0005*Q%`5>sU\tI\u0001\u0007E\u00164wN]3\u0015\u0005\u0005e\u0004\u0003BA\u0003\u0003wJA!! \u0002\b\t!QK\\5uQ\rQ\u0012\u0011\u0011\t\u0005\u0003\u0007\u000by)\u0004\u0002\u0002\u0006*\u0019A-a\"\u000b\t\u0005%\u00151R\u0001\bUV\u0004\u0018\u000e^3s\u0015\r\ti\tV\u0001\u0006UVt\u0017\u000e^\u0005\u0005\u0003#\u000b)I\u0001\u0006CK\u001a|'/Z#bG\"\f\u0011\u0004^3ti\u00163XM\u001c;US6,G+^7cY\u0016<\u0016N\u001c3po\"\u001a1$a&\u0011\t\u0005\r\u0015\u0011T\u0005\u0005\u00037\u000b)I\u0001\u0007UKN$H+Z7qY\u0006$X-A\u0012uKN$XI^3oiRKW.\u001a+v[\ndWmV5oI><x+\u001b;i\u001f\u001a47/\u001a;)\u0007q\t9*\u0001\u0016uKN$8)Y:dC\u0012,WI^3oiRKW.\u001a+v[\ndWmV5oI><x+\u001b;i\u001f\u001a47/\u001a;)\u0007u\t9*A\u0016uKN$XI^3oiRKW.\u001a+v[\ndWmV5oI><x+\u001b;i\u001d\u0016<\u0017\r^5wK>3gm]3uQ\rq\u0012qS\u0001'i\u0016\u001cH/\u0012<f]R$\u0016.\\3Uk6\u0014G.Z,j]\u0012|woX$s_V\u0004\u0018N\\4TKR\u001c\bfA\u0010\u0002\u0018\u0006qB/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V/\u001c2mK^Kg\u000eZ8x?\u000e+(-\u001a\u0015\u0004A\u0005]\u0015\u0001\t;fgR,e/\u001a8u)&lW\rV;nE2,w+\u001b8e_^|&k\u001c7mkBD3!IAL\u0003\u0001\"Xm\u001d;Uk6\u0014G.Z,j]\u0012|woT;uaV$x+\u001b8e_^$\u0016.\\3)\u0007\t\n9*A\u0011uKN$H+^7cY\u0016<\u0016N\u001c3po\u001e\u0013x.\u001e9P]^Kg\u000eZ8x\u001f:d\u0017\u0010K\u0002$\u0003/\u000b!\u0006^3tiR+XN\u00197f/&tGm\\<XSRDw.\u001e;PkR\u0004X\u000f^,j]\u0012|woQ8mk6t7\u000fK\u0002%\u0003/\u000b!\u0005^3ti\u00163XM\u001c;US6,\u0007j\u001c9XS:$wn^,ji\"$\u0015n\u001d;j]\u000e$\bfA\u0013\u0002\u0018\u0006\u0001C/Z:u\u000bZ,g\u000e\u001e+j[\u0016Du\u000e],j]\u0012|woV5uQ>3gm]3uQ\r1\u0013qS\u0001)i\u0016\u001cH/\u0012<f]R$\u0016.\\3I_B<\u0016N\u001c3po^KG\u000f\u001b(fO\u0006$\u0018N^3PM\u001a\u001cX\r\u001e\u0015\u0004O\u0005]\u0015a\t;fgR,e/\u001a8u)&lW\rS8q/&tGm\\<`\u000fJ|W\u000f]5oON+Go\u001d\u0015\u0004Q\u0005]\u0015a\u0007;fgR,e/\u001a8u)&lW\rS8q/&tGm\\<`\u0007V\u0014W\rK\u0002*\u0003/\u000bQ\u0004^3ti\u00163XM\u001c;US6,\u0007j\u001c9XS:$wn^0S_2dW\u000f\u001d\u0015\u0004U\u0005]\u0015a\u0007;fgR,e/\u001a8u)&lWmQ;nk2\fG/Z,j]\u0012|w\u000fK\u0002,\u0003/\u000bQ\u0005^3ti\u00163XM\u001c;US6,7)^7vY\u0006$XmV5oI><x+\u001b;i\u001f\u001a47/\u001a;)\u00071\n9*A\u0017uKN$XI^3oiRKW.Z\"v[Vd\u0017\r^3XS:$wn^,ji\"tUmZ1uSZ,wJ\u001a4tKRD3!LAL\u0003!\"Xm\u001d;Fm\u0016tG\u000fV5nK\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po~;%o\\;qS:<7+\u001a;tQ\rq\u0013qS\u0001!i\u0016\u001cH/\u0012<f]R$\u0016.\\3Dk6,H.\u0019;f/&tGm\\<`\u0007V\u0014W\rK\u00020\u0003/\u000b!\u0005^3ti\u00163XM\u001c;US6,7)^7vY\u0006$XmV5oI><xLU8mYV\u0004\bf\u0001\u0019\u0002\u0018\u0006)B/Z:u\r&,G\u000e\u001a(b[\u0016\u001cuN\u001c4mS\u000e$\bfA\u0019\u0002\u0018\u0006)C/Z:u%\u0016d\u0017\r\u001f$pe6\u0004&o\\2uS6,7)Y:dC\u0012,w+\u001b8e_^\fum\u001a\u0015\u0004e\u0005]\u0015A\n;fgR,e/\u001a8u)&lW\rV;nE2,w+\u001b8e_^<\u0016\u000e\u001e5D\t\u000e\u001bv.\u001e:dK\"\u001a1'a&\u0002GQ,7\u000f^#wK:$H+[7f\u0011>\u0004x+\u001b8e_^<\u0016\u000e\u001e5D\t\u000e\u001bv.\u001e:dK\"\u001aA'a&\u0002QQ,7\u000f^#wK:$H+[7f\u0007VlW\u000f\\1uK^Kg\u000eZ8x/&$\bn\u0011#D'>,(oY3)\u0007U\n9*\u0001\u0019uKN$(+\u001a;sC\u000e$\bK]3wS>,8o\u00157jG&twm\u0015;bi\u0016<\u0016\u000e\u001e5TY&\u001c\u0017N\\4XS:$wn\u001e\u0015\u0004m\u0005]\u0015A\u0007;fgR,e/\u001a8u)&lWmU3tg&|gnV5oI><\bfA\u001c\u0002\u0018\u00069D/Z:u\u000bZ,g\u000e\u001e+j[\u0016\u001cVm]:j_:<\u0016N\u001c3po^KG\u000f\u001b+W\r:{G\u000fU;mYV\u0003\u0018J\u001c;p/&tGm\\<BO\u001eD3\u0001OAL\u0003\u001d\"Xm\u001d;Fm\u0016tG\u000fV5nKN+7o]5p]^Kg\u000eZ8x/&$\bn\u0011#D'>,(oY3)\u0007e\n9*\u0001\u0019uKN$H)[:uS:\u001cG/Q4h/&$\b.T3sO\u0016|e.\u0012<f]R$\u0016.\\3TKN\u001c\u0018n\u001c8XS:$wn\u001e\u0015\u0004u\u0005]\u0015!\n;fgR\u0004VM]2f]RLG.Z(o\u000bZ,g\u000e\u001e+j[\u0016$V/\u001c2mK^Kg\u000eZ8xQ\rY\u0014qS\u0001\u0010m\u0016\u0014\u0018NZ=XS:$wn^!hORA\u0011\u0011\u0010B\u0011\u0005g\u00119\u0005C\u0004\u0003$q\u0002\rA!\n\u0002\u001bQ4hM\u0012:p[\u000ec\u0017-^:f!\u0011\u00119Ca\f\u000f\t\t%\"1\u0006\t\u0004c\u0006\u001d\u0011\u0002\u0002B\u0017\u0003\u000f\ta\u0001\u0015:fI\u00164\u0017\u0002BA \u0005cQAA!\f\u0002\b!9!Q\u0007\u001fA\u0002\t]\u0012aD1mY\u0016C\b/Z2uK\u0012$\u0015\r^1\u0011\r\te\"1\tB\u0013\u001d\u0011\u0011YDa\u0010\u000f\u0007E\u0014i$\u0003\u0002\u0002\n%!!\u0011IA\u0004\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\f\u0003F)!!\u0011IA\u0004\u0011%\u0011I\u0005\u0010I\u0001\u0002\u0004\t\u0019!A\u0006jg\u000e#7mU8ve\u000e,\u0017!\u0007<fe&4\u0017pV5oI><\u0018iZ4%I\u00164\u0017-\u001e7uIM*\"Aa\u0014+\t\u0005\r!\u0011K\u0016\u0003\u0005'\u0002BA!\u0016\u0003`5\u0011!q\u000b\u0006\u0005\u00053\u0012Y&A\u0005v]\u000eDWmY6fI*!!QLA\u0004\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0005C\u00129FA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fqD^3sS\u001aLx+\u001b8e_^\fumZ,ji\"<%o\\;qS:<7+\u001a;t)!\tIHa\u001a\u0003j\t5\u0004b\u0002B\u0012}\u0001\u0007!Q\u0005\u0005\b\u0005Wr\u0004\u0019\u0001B\u0013\u0003E9'o\\;qS:<7+\u001a;DY\u0006,8/\u001a\u0005\b\u0005kq\u0004\u0019\u0001B\u001c\u0003A)\u00070Z2vi\u0016\fe\u000e\u001a,fe&4\u0017\u0010\u0006\u0005\u0002z\tM$q\u000fB=\u0011\u001d\u0011)h\u0010a\u0001\u0005K\tQ!];fefDqA!\u000e@\u0001\u0004\u00119\u0004C\u0004\u0003|}\u0002\rA! \u0002/9,X\u000eV8Ee>\u0004x+\u001b;i\u0003NLhnY*uCR,\u0007\u0003BA\u0003\u0005\u007fJAA!!\u0002\b\t\u0019\u0011J\u001c;\u00023\u0019LG\u000e^3s)\u0006LG\u000eR1uC&3g*Z2fgN\f'/\u001f\u000b\u0007\u0005o\u00119Ia#\t\u000f\t%\u0005\t1\u0001\u00038\u0005!A-\u0019;b\u0011\u001d\u0011Y\b\u0011a\u0001\u0005{Bs\u0001\u0001BH\u00057\u0013i\n\u0005\u0003\u0003\u0012\n]UB\u0001BJ\u0015\u0011\u0011)*!\"\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002\u0002BM\u0005'\u0013!\"\u0012=uK:$w+\u001b;i\u0003\u00151\u0018\r\\;fY\t\u0011yj\t\u0002\u0003\"B!!1\u0015BZ\u001b\t\u0011)K\u0003\u0003\u0003(\n%\u0016!\u00049be\u0006lW\r^3sSj,GM\u0003\u0003\u0003,\n5\u0016AC3yi\u0016t7/[8og*!\u0011Q\u0012BX\u0015\r\u0011\t\fU\u0001\ni\u0016\u001cH/\u001e;jYNLAA!.\u0003&\nQ\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;FqR,gn]5p]\u0006)r+\u001b8e_^\fum\u001a:fO\u0006$X-\u0013+DCN,\u0007cAA\f\u0005N\u0019!I!0\u0011\t\u0005\u0015!qX\u0005\u0005\u0005\u0003\f9A\u0001\u0004B]f\u0014VM\u001a\u000b\u0003\u0005s\u000b!\u0002]1sC6,G/\u001a:t)\t\u0011I\r\u0005\u0004\u0003L\nE'Q[\u0007\u0003\u0005\u001bTAAa4\u0002:\u0005!Q\u000f^5m\u0013\u0011\u0011\u0019N!4\u0003\u0015\r{G\u000e\\3di&|g\u000e\u0005\u0004\u0002\u0006\t]'1\\\u0005\u0005\u00053\f9AA\u0003BeJ\f\u0017\u0010\u0005\u0003\u00024\tu\u0017\u0002\u0002Bp\u0003k\u0011aa\u00142kK\u000e$\bf\u0002#\u0003d\n%(1\u001e\t\u0005\u0005G\u0013)/\u0003\u0003\u0003h\n\u0015&A\u0003)be\u0006lW\r^3sg\u0006!a.Y7fC\t\u0011i/A'BO\u001e\u0004\u0006.Y:f{m\u0004T\u0010\f\u0011Ti\u0006$XMQ1dW\u0016tG-P>2{2\u0002Sk]3US6,7\u000f^1na2#(\u0010I\u001f!wJjH\u0006I#oC\ndW-Q:z]\u000e\u001cF/\u0019;fAu\u00023pM?")
public class WindowAggregateITCase
extends StreamingWithStateTestBase {
    private final AggregatePhaseStrategy aggPhase;
    private final boolean useTimestampLtz;
    private final boolean enableAsyncState;
    private final Seq<String> TumbleWindowGroupSetExpectedData;
    private final Seq<String> TumbleWindowCubeExpectedData;
    private final Seq<String> TumbleWindowRollupExpectedData;
    private final Seq<String> HopWindowGroupSetExpectedData;
    private final Seq<String> HopWindowCubeExpectedData;
    private final Seq<String> HopWindowRollupExpectedData;
    private final Seq<String> CumulateWindowGroupSetExpectedData;
    private final Seq<String> CumulateWindowCubeExpectedData;
    private final Seq<String> CumulateWindowRollupExpectedData;
    private final ZoneId SHANGHAI_ZONE;

    @Parameters(name="AggPhase={0}, StateBackend={1}, UseTimestampLtz = {2}, EnableAsyncState = {3}")
    public static Collection<Object[]> parameters() {
        return WindowAggregateITCase$.MODULE$.parameters();
    }

    public Seq<String> TumbleWindowGroupSetExpectedData() {
        return this.TumbleWindowGroupSetExpectedData;
    }

    public Seq<String> TumbleWindowCubeExpectedData() {
        return this.TumbleWindowCubeExpectedData;
    }

    public Seq<String> TumbleWindowRollupExpectedData() {
        return this.TumbleWindowRollupExpectedData;
    }

    public Seq<String> HopWindowGroupSetExpectedData() {
        return this.HopWindowGroupSetExpectedData;
    }

    public Seq<String> HopWindowCubeExpectedData() {
        return this.HopWindowCubeExpectedData;
    }

    public Seq<String> HopWindowRollupExpectedData() {
        return this.HopWindowRollupExpectedData;
    }

    public Seq<String> CumulateWindowGroupSetExpectedData() {
        return this.CumulateWindowGroupSetExpectedData;
    }

    public Seq<String> CumulateWindowCubeExpectedData() {
        return this.CumulateWindowCubeExpectedData;
    }

    public Seq<String> CumulateWindowRollupExpectedData() {
        return this.CumulateWindowRollupExpectedData;
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixeddelay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)BoxesRunTime.boxToInteger((int)1));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)this.enableAsyncState));
        this.env().configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        FailingCollectionSource.reset();
        String insertOnlyDataId = this.useTimestampLtz ? TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithLtzInShanghai()) : TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T1 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append(insertOnlyDataId).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        String changelogDataId = this.useTimestampLtz ? TestValuesTableFactory.registerData(TestData$.MODULE$.windowChangelogDataWithLtzInShanghai()) : TestValuesTableFactory.registerData(TestData$.MODULE$.windowChangelogDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(507).append("\n         |CREATE TABLE T1_CDC (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append(changelogDataId).append("',\n         | 'failing-source' = 'true',\n         | 'changelog-mode' = 'I,UA,UB,D'\n         |)\n         |").toString())).stripMargin());
        this.tEnv().createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
        this.tEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)this.aggPhase);
    }

    @TestTemplate
    public void testEventTimeTumbleWindow() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$))))));
        this.verifyWindowAgg("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeTumbleWindowWithOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T08:00,2020-10-10T08:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T08:00,2020-10-10T08:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T08:00,2020-10-10T08:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$)));
        this.verifyWindowAgg("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR)", (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testCascadeEventTimeTumbleWindowWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  cnt,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*)\n        |  FROM\n        |  (\n        |    SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) AS cnt\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '8' HOUR))\n        |    GROUP BY `name`, window_start, window_end\n        |) GROUP BY cnt, window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"0,2020-10-09T08:00,2020-10-10T08:00,1", (List)new .colon.colon((Object)"3,2020-10-09T08:00,2020-10-10T08:00,2", (List)Nil$.MODULE$));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testEventTimeTumbleWindowWithNegativeOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T16:00,2020-10-10T16:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T16:00,2020-10-10T16:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T16:00,2020-10-10T16:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$)));
        this.verifyWindowAgg("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '1' DAY, INTERVAL '-8' HOUR)", (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeTumbleWindow_GroupingSets() {
        this.verifyWindowAggWithGroupingSets("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", "GROUPING SETS((`name`),())", this.TumbleWindowGroupSetExpectedData());
    }

    @TestTemplate
    public void testEventTimeTumbleWindow_Cube() {
        this.verifyWindowAggWithGroupingSets("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", "CUBE(`name`)", this.TumbleWindowCubeExpectedData());
    }

    @TestTemplate
    public void testEventTimeTumbleWindow_Rollup() {
        this.verifyWindowAggWithGroupingSets("TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", "ROLLUP(`name`)", this.TumbleWindowRollupExpectedData());
    }

    @TestTemplate
    public void testTumbleWindowOutputWindowTime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time,\n        |  COUNT(*)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end, window_time\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z,4", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1", (List)Nil$.MODULE$)))))) : (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999,4", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowGroupOnWindowOnly() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  SUM(`bigdec`),\n        |  MAX(`double`),\n        |  MIN(`float`),\n        |  COUNT(DISTINCT `string`),\n        |  concat_distinct_agg(`string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", (List)new .colon.colon((Object)"2020-10-10T00:00:05,2020-10-10T00:00:10,3,9.99,6.0,3.0,3,Hello|Hi|Comment#2", (List)new .colon.colon((Object)"2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", (List)new .colon.colon((Object)"2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1,Comment#3", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowWithoutOutputWindowColumns() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  COUNT(*),\n        |  SUM(`bigdec`),\n        |  MAX(`double`),\n        |  MIN(`float`),\n        |  COUNT(DISTINCT `string`),\n        |  concat_distinct_agg(`string`)\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"4,11.10,5.0,1.0,2,Hi|Comment#1", (List)new .colon.colon((Object)"3,9.99,6.0,3.0,3,Hello|Hi|Comment#2", (List)new .colon.colon((Object)"1,4.44,4.0,4.0,1,Hi", (List)new .colon.colon((Object)"2,11.10,7.0,3.0,1,Comment#3", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testEventTimeHopWindowWithDistinct() {
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Comment#2|Hi|Comment#1", "a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "null,2020-10-10T00:00:25,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null"}));
        this.verifyWindowAgg("HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)", (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeHopWindowWithOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T08:00,2020-10-10T08:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"a,2020-10-09T20:00,2020-10-10T20:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T08:00,2020-10-10T08:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"b,2020-10-09T20:00,2020-10-10T20:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T08:00,2020-10-10T08:00,1,7.77,7.0,7.0,0,null", (List)new .colon.colon((Object)"null,2020-10-09T20:00,2020-10-10T20:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$))))));
        this.verifyWindowAgg(new StringOps(Predef$.MODULE$.augmentString("\n        |HOP(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '12' HOUR,\n        |  INTERVAL '1' DAY,\n        |  INTERVAL '8' HOUR)\n        |")).stripMargin(), (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeHopWindowWithNegativeOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T04:00,2020-10-10T04:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"a,2020-10-09T16:00,2020-10-10T16:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T04:00,2020-10-10T04:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"b,2020-10-09T16:00,2020-10-10T16:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T04:00,2020-10-10T04:00,1,7.77,7.0,7.0,0,null", (List)new .colon.colon((Object)"null,2020-10-09T16:00,2020-10-10T16:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$))))));
        this.verifyWindowAgg(new StringOps(Predef$.MODULE$.augmentString("\n        |HOP(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '12' HOUR,\n        |  INTERVAL '1' DAY,\n        |  INTERVAL '-8' HOUR)\n        |")).stripMargin(), (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeHopWindow_GroupingSets() {
        this.verifyWindowAggWithGroupingSets("HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)", "GROUPING SETS((`name`),())", this.HopWindowGroupSetExpectedData());
    }

    @TestTemplate
    public void testEventTimeHopWindow_Cube() {
        this.verifyWindowAggWithGroupingSets("HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)", "CUBE(`name`)", this.HopWindowCubeExpectedData());
    }

    @TestTemplate
    public void testEventTimeHopWindow_Rollup() {
        this.verifyWindowAggWithGroupingSets("HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)", "ROLLUP(`name`)", this.HopWindowRollupExpectedData());
    }

    @TestTemplate
    public void testEventTimeCumulateWindow() {
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", "a,2020-10-10T00:00,2020-10-10T00:00:15,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null", "null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0,null"}));
        this.verifyWindowAgg(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '5' SECOND,\n        |  INTERVAL '15' SECOND)\n        |")).stripMargin(), (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeCumulateWindowWithOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T08:00,2020-10-10T08:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T08:00,2020-10-10T08:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T08:00,2020-10-10T08:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$)));
        this.verifyWindowAgg(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '12' HOUR,\n        |  INTERVAL '1' DAY,\n        |  INTERVAL '8' HOUR)\n        |")).stripMargin(), (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeCumulateWindowWithNegativeOffset() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T16:00,2020-10-10T04:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"a,2020-10-09T16:00,2020-10-10T16:00,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-09T16:00,2020-10-10T04:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"b,2020-10-09T16:00,2020-10-10T16:00,4,14.43,6.0,3.0,3,Hello|Hi|Comment#3", (List)new .colon.colon((Object)"null,2020-10-09T16:00,2020-10-10T04:00,1,7.77,7.0,7.0,0,null", (List)new .colon.colon((Object)"null,2020-10-09T16:00,2020-10-10T16:00,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$))))));
        this.verifyWindowAgg(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '12' HOUR,\n        |  INTERVAL '1' DAY,\n        |  INTERVAL '-8' HOUR)\n        |")).stripMargin(), (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeCumulateWindow_GroupingSets() {
        this.verifyWindowAggWithGroupingSets(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '5' SECOND,\n        |  INTERVAL '15' SECOND)\n        |")).stripMargin(), "GROUPING SETS((`name`),())", this.CumulateWindowGroupSetExpectedData());
    }

    @TestTemplate
    public void testEventTimeCumulateWindow_Cube() {
        this.verifyWindowAggWithGroupingSets(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '5' SECOND,\n        |  INTERVAL '15' SECOND)\n        |")).stripMargin(), "Cube(`name`)", this.CumulateWindowCubeExpectedData());
    }

    @TestTemplate
    public void testEventTimeCumulateWindow_Rollup() {
        this.verifyWindowAggWithGroupingSets(new StringOps(Predef$.MODULE$.augmentString("\n        |CUMULATE(\n        |  TABLE T1,\n        |  DESCRIPTOR(rowtime),\n        |  INTERVAL '5' SECOND,\n        |  INTERVAL '15' SECOND)\n        |")).stripMargin(), "ROLLUP(`name`)", this.CumulateWindowRollupExpectedData());
    }

    @TestTemplate
    public void testFieldNameConflict() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  window_time,\n        |  MIN(rowtime) as start_time,\n        |  MAX(rowtime) as end_time\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY window_start, window_end, window_time\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:04.999Z,2020-10-09T16:00:01Z,2020-10-09T16:00:04Z", (List)new .colon.colon((Object)"2020-10-09T16:00:09.999Z,2020-10-09T16:00:06Z,2020-10-09T16:00:08Z", (List)new .colon.colon((Object)"2020-10-09T16:00:19.999Z,2020-10-09T16:00:16Z,2020-10-09T16:00:16Z", (List)new .colon.colon((Object)"2020-10-09T16:00:34.999Z,2020-10-09T16:00:32Z,2020-10-09T16:00:34Z", (List)Nil$.MODULE$)))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:04.999,2020-10-10T00:00:01,2020-10-10T00:00:04", (List)new .colon.colon((Object)"2020-10-10T00:00:09.999,2020-10-10T00:00:06,2020-10-10T00:00:08", (List)new .colon.colon((Object)"2020-10-10T00:00:19.999,2020-10-10T00:00:16,2020-10-10T00:00:16", (List)new .colon.colon((Object)"2020-10-10T00:00:34.999,2020-10-10T00:00:32,2020-10-10T00:00:34", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testRelaxFormProctimeCascadeWindowAgg() {
        String timestampDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(615).append("\n                       |CREATE TABLE proctime_src (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `proctime` AS PROCTIME()\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(timestampDataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  window_start,\n        |  window_end,\n        |  COUNT(*)\n        |FROM\n        |(\n        |    SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) AS cnt\n        |    FROM TABLE(\n        |      TUMBLE(TABLE proctime_src, DESCRIPTOR(proctime), INTERVAL '1' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) GROUP BY window_start, window_end\n        ")).stripMargin();
        TestingRetractSink sink = new TestingRetractSink();
        Table res = this.tEnv().sqlQuery(sql);
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(res).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
    }

    @TestTemplate
    public void testEventTimeTumbleWindowWithCDCSource() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,3,29.99,22.0,2.0,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1", (List)Nil$.MODULE$))));
        this.verifyWindowAgg("TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", (Seq<String>)expected, true);
    }

    @TestTemplate
    public void testEventTimeHopWindowWithCDCSource() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T23:59:55,2020-10-10T00:00:05,3,29.99,22.0,2.0,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:10,5,38.87,22.0,2.0,4", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1", (List)Nil$.MODULE$)))))));
        this.verifyWindowAgg("HOP(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)", (Seq<String>)expected, true);
    }

    @TestTemplate
    public void testEventTimeCumulateWindowWithCDCSource() {
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,3,29.99,22.0,2.0,2", "a,2020-10-10T00:00,2020-10-10T00:00:10,5,38.87,22.0,2.0,4", "a,2020-10-10T00:00,2020-10-10T00:00:15,5,38.87,22.0,2.0,4", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1"}));
        this.verifyWindowAgg("CUMULATE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '15' SECOND)", (Seq<String>)expected, true);
    }

    @TestTemplate
    public void testRetractPreviousSlicingStateWithSlicingWindow() {
        String dataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)TestValuesTableFactory.changelogRow("+I", "2020-10-10 00:00:01", BoxesRunTime.boxToInteger((int)1), "s1", "a"), (List)new .colon.colon((Object)TestValuesTableFactory.changelogRow("+I", "2020-10-10 00:00:04", BoxesRunTime.boxToInteger((int)1), "s2", "a"), (List)new .colon.colon((Object)TestValuesTableFactory.changelogRow("-D", "2020-10-10 00:00:06", BoxesRunTime.boxToInteger((int)3), "s3", "a"), (List)Nil$.MODULE$)))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(627).append("\n                       |CREATE TABLE MyTable (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  SUM(`int`),\n        |  COUNT(DISTINCT `string`)\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-09T23:59:55,2020-10-10T00:00:05,2,2,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:10,1,-1,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:15,-1,-3,0", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testEventTimeSessionWindow() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:01,2020-10-10T00:00:13,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:06,2020-10-10T00:00:12,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:16,2020-10-10T00:00:21,1,4.44,4.0,4.0,1,Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:34,2020-10-10T00:00:39,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"null,2020-10-10T00:00:32,2020-10-10T00:00:37,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$)))));
        this.verifyWindowAgg("SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", (Seq<String>)expected, this.verifyWindowAgg$default$3());
    }

    @TestTemplate
    public void testEventTimeSessionWindowWithTVFNotPullUpIntoWindowAgg() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(*),\n        |  SUM(`bigdec`),\n        |  MAX(`double`),\n        |  MIN(`float`),\n        |  COUNT(DISTINCT `string`),\n        |  concat_distinct_agg(`string`)\n        |FROM (\n        | SELECT * FROM TABLE(\n        |   SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |   WHERE window_start > TIMESTAMP '2000-01-01 10:10:00.000'\n        |)\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:01,2020-10-10T00:00:13,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:06,2020-10-10T00:00:12,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:16,2020-10-10T00:00:21,1,4.44,4.0,4.0,1,Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:34,2020-10-10T00:00:39,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"null,2020-10-10T00:00:32,2020-10-10T00:00:37,1,7.77,7.0,7.0,0,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testEventTimeSessionWindowWithCDCSource() {
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:01,2020-10-10T00:00:13,5,38.87,22.0,2.0,4", (List)new .colon.colon((Object)"b,2020-10-10T00:00:06,2020-10-10T00:00:12,2,6.66,6.0,3.0,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:16,2020-10-10T00:00:21,1,4.44,4.0,4.0,1", (List)Nil$.MODULE$)));
        this.verifyWindowAgg("SESSION(TABLE T1_CDC PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)", (Seq<String>)expected, true);
    }

    @TestTemplate
    public void testDistinctAggWithMergeOnEventTimeSessionWindow() {
        .colon.colon sessionWindowTestData = new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)10L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)9), (Object)"Hello World"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)16), (Object)"Hello"), (List)Nil$.MODULE$)))))));
        SingleOutputStreamOperator stream = this.failingDataSource(sessionWindowTestData, new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.runtime.stream.sql.WindowAggregateITCase$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions((DataStream)stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        this.tEnv().registerTable("MyTable", table);
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c,\n        |   COUNT(DISTINCT b),\n        |   window_end\n        |FROM TABLE(\n        |  SESSION(TABLE MyTable PARTITION BY c, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND))\n        |GROUP BY c, window_start, window_end\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sqlQuery)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"Hello World,1,1970-01-01T00:00:00.014", (List)new .colon.colon((Object)"Hello,1,1970-01-01T00:00:00.021", (List)new .colon.colon((Object)"Hello,3,1970-01-01T00:00:00.015", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testPercentileOnEventTimeTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  PERCENTILE(`double`, 0.5) as `swo`,\n        |  PERCENTILE(`double`, 0.5, `int`) as `sw`,\n        |  PERCENTILE(`double`, ARRAY[0.5, 0.2, 0.6]) as `mwo`,\n        |  PERCENTILE(`double`, ARRAY[0.5, 0.2, 0.6], `int`) as `mw`\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n      ")).stripMargin();
        String outer = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(220).append("\n         |select\n         | `name`,\n         | window_start,\n         | window_end,\n         | `swo`,\n         | `sw`,\n         | `mwo`[1], `mwo`[2], `mwo`[3],\n         | `mw`[1], `mw`[2], `mw`[3]\n         |FROM (").append(sql).append(")\n    ").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(outer)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        .colon.colon expected_key = new .colon.colon((Object)new .colon.colon((Object)"a", (List)new .colon.colon((Object)"2020-10-10T00:00", (List)new .colon.colon((Object)"2020-10-10T00:00:05", (List)Nil$.MODULE$))), (List)new .colon.colon((Object)new .colon.colon((Object)"a", (List)new .colon.colon((Object)"2020-10-10T00:00:05", (List)new .colon.colon((Object)"2020-10-10T00:00:10", (List)Nil$.MODULE$))), (List)new .colon.colon((Object)new .colon.colon((Object)"b", (List)new .colon.colon((Object)"2020-10-10T00:00:05", (List)new .colon.colon((Object)"2020-10-10T00:00:10", (List)Nil$.MODULE$))), (List)new .colon.colon((Object)new .colon.colon((Object)"b", (List)new .colon.colon((Object)"2020-10-10T00:00:15", (List)new .colon.colon((Object)"2020-10-10T00:00:20", (List)Nil$.MODULE$))), (List)Nil$.MODULE$))));
        .colon.colon expected_value = new .colon.colon((Object)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{5.0, 22.0, 5.0, 3.2, 8.4, 22.0, 5.0, 22.0})), (List)new .colon.colon((Object)((List)List$.MODULE$.fill(8, (Function0)(JFunction0.mcD.sp & Serializable & scala.Serializable)() -> Double.NaN)), (List)new .colon.colon((Object)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapDoubleArray(new double[]{4.5, 6.0, 4.5, 3.6, 4.8, 6.0, 3.0, 6.0})), (List)new .colon.colon((Object)((List)List$.MODULE$.fill(8, (Function0)(JFunction0.mcD.sp & Serializable & scala.Serializable)() -> 4.0)), (List)Nil$.MODULE$))));
        int key_length = ((LinearSeqOptimized)expected_key.head()).length();
        Percentage ERROR_RATE = Percentage.withPercentage((double)1.0E-6);
        List result = (List)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$);
        result.indices().foreach$mVc$sp((Function1)((JFunction1.mcVI.sp & Serializable & scala.Serializable)arg_0 -> WindowAggregateITCase.$anonfun$testPercentileOnEventTimeTumbleWindow$3(result, (List)expected_key, (List)expected_value, key_length, ERROR_RATE, arg_0)));
    }

    private void verifyWindowAgg(String tvfFromClause, Seq<String> allExpectedData, boolean isCdcSource) {
        String aggFunctionsWithDataView = isCdcSource ? new StringOps(Predef$.MODULE$.augmentString("\n          |,COUNT(DISTINCT `string`)\n          |")).stripMargin() : new StringOps(Predef$.MODULE$.augmentString("\n          |,COUNT(DISTINCT `string`)\n          |,concat_distinct_agg(`string`)\n          |")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(363).append("\n         |SELECT\n         |  `name`\n         |  ,window_start\n         |  ,window_end\n         |  ,COUNT(*)\n         |  ,SUM(`bigdec`)\n         |  ,MAX(`double`)\n         |  ,MIN(`float`)\n         |  -- agg function with data view does not support async state yet\n         |  ").append((Object)(!this.enableAsyncState ? aggFunctionsWithDataView : "")).append("\n         |FROM TABLE(").append(tvfFromClause).append(")\n         |GROUP BY `name`, window_start, window_end\n         |").toString())).stripMargin();
        int numToDropWithAsyncState = isCdcSource ? 1 : 2;
        this.executeAndVerify(sql, allExpectedData, numToDropWithAsyncState);
    }

    private boolean verifyWindowAgg$default$3() {
        return false;
    }

    private void verifyWindowAggWithGroupingSets(String tvfFromClause, String groupingSetClause, Seq<String> allExpectedData) {
        String aggFunctionsWithDataView = new StringOps(Predef$.MODULE$.augmentString("\n        |,COUNT(DISTINCT `string`)\n        |,concat_distinct_agg(`string`)\n        |")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(390).append("\n         |SELECT\n         |  GROUPING_ID(`name`),\n         |  `name`\n         |  ,window_start\n         |  ,window_end\n         |  ,COUNT(*)\n         |  ,SUM(`bigdec`)\n         |  ,MAX(`double`)\n         |  ,MIN(`float`)\n         |  -- agg function with data view does not support async state yet\n         |  ").append((Object)(!this.enableAsyncState ? aggFunctionsWithDataView : "")).append("\n         |FROM TABLE(").append(tvfFromClause).append(")\n         |GROUP BY ").append(groupingSetClause).append(", window_start, window_end\n         |").toString())).stripMargin();
        this.executeAndVerify(sql, allExpectedData, 2);
    }

    private void executeAndVerify(String query, Seq<String> allExpectedData, int numToDropWithAsyncState) {
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq<String> expected = this.filterTailDataIfNecessary(allExpectedData, numToDropWithAsyncState);
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    private Seq<String> filterTailDataIfNecessary(Seq<String> data, int numToDropWithAsyncState) {
        if (!this.enableAsyncState) {
            return data;
        }
        return (Seq)((TraversableLike)data.map((Function1 & Serializable & scala.Serializable)line -> {
            String[] parts = line.split(",");
            if (parts.length >= numToDropWithAsyncState) {
                return (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])parts)).dropRight(numToDropWithAsyncState);
            }
            return (String[])Array$.MODULE$.empty(ClassTag$.MODULE$.apply(String.class));
        }, Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$1 -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])x$1)).mkString(","), Seq$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ AbstractStringAssert $anonfun$testPercentileOnEventTimeTumbleWindow$4(String[] actual$1, List expected_key$1, int i$1, int j) {
        return Assertions.assertThat((String)actual$1[j]).isEqualTo((String)((LinearSeqOptimized)expected_key$1.apply(i$1)).apply(j));
    }

    public static final /* synthetic */ AbstractAssert $anonfun$testPercentileOnEventTimeTumbleWindow$5(List expected_value$1, int i$1, String[] actual$1, int key_length$1, Percentage ERROR_RATE$1, int j) {
        if (!Double.isNaN(BoxesRunTime.unboxToDouble((Object)((LinearSeqOptimized)expected_value$1.apply(i$1)).apply(j)))) {
            return Assertions.assertThat((double)new StringOps(Predef$.MODULE$.augmentString(actual$1[j + key_length$1])).toDouble()).isCloseTo(BoxesRunTime.unboxToDouble((Object)((LinearSeqOptimized)expected_value$1.apply(i$1)).apply(j)), ERROR_RATE$1);
        }
        return Assertions.assertThat((String)actual$1[j + key_length$1]).isEqualTo("null");
    }

    public static final /* synthetic */ void $anonfun$testPercentileOnEventTimeTumbleWindow$3(List result$1, List expected_key$1, List expected_value$1, int key_length$1, Percentage ERROR_RATE$1, int i) {
        String[] actual = ((String)result$1.apply(i)).split(",");
        ((SeqLike)expected_key$1.apply(i)).indices().foreach((Function1 & Serializable & scala.Serializable)j -> WindowAggregateITCase.$anonfun$testPercentileOnEventTimeTumbleWindow$4(actual, expected_key$1, i, BoxesRunTime.unboxToInt((Object)j)));
        ((SeqLike)expected_value$1.apply(i)).indices().foreach((Function1 & Serializable & scala.Serializable)j -> WindowAggregateITCase.$anonfun$testPercentileOnEventTimeTumbleWindow$5(expected_value$1, i, actual, key_length$1, ERROR_RATE$1, BoxesRunTime.unboxToInt((Object)j)));
    }

    public WindowAggregateITCase(AggregatePhaseStrategy aggPhase, StreamingWithStateTestBase.StateBackendMode state, boolean useTimestampLtz, boolean enableAsyncState) {
        this.aggPhase = aggPhase;
        this.useTimestampLtz = useTimestampLtz;
        this.enableAsyncState = enableAsyncState;
        super(state);
        this.TumbleWindowGroupSetExpectedData = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"0,a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "0,a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2", "0,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "0,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", "1,null,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "1,null,2020-10-10T00:00:05,2020-10-10T00:00:10,3,9.99,6.0,3.0,3,Hello|Hi|Comment#2", "1,null,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1,Comment#3"}));
        this.TumbleWindowCubeExpectedData = this.TumbleWindowGroupSetExpectedData();
        this.TumbleWindowRollupExpectedData = this.TumbleWindowGroupSetExpectedData();
        this.HopWindowGroupSetExpectedData = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"0,a,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "0,a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Comment#2|Hi|Comment#1", "0,a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2", "0,b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "0,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "0,b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "0,null,2020-10-10T00:00:25,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", "0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null", "1,null,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "1,null,2020-10-10T00:00,2020-10-10T00:00:10,8,26.64,6.0,1.0,4,Hello|Hi|Comment#2|Comment#1", "1,null,2020-10-10T00:00:05,2020-10-10T00:00:15,3,9.99,6.0,3.0,3,Hello|Hi|Comment#2", "1,null,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:25,2020-10-10T00:00:35,2,11.10,7.0,3.0,1,Comment#3", "1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1,Comment#3"}));
        this.HopWindowCubeExpectedData = this.HopWindowGroupSetExpectedData();
        this.HopWindowRollupExpectedData = this.HopWindowGroupSetExpectedData();
        this.CumulateWindowGroupSetExpectedData = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"0,a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "0,a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", "0,a,2020-10-10T00:00,2020-10-10T00:00:15,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", "0,b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "0,b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "0,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi", "0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3", "0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null", "0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null", "0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0,null", "1,null,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1", "1,null,2020-10-10T00:00,2020-10-10T00:00:10,8,26.64,6.0,1.0,4,Hi|Comment#1|Hello|Comment#2", "1,null,2020-10-10T00:00,2020-10-10T00:00:15,8,26.64,6.0,1.0,4,Hi|Comment#1|Hello|Comment#2", "1,null,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi", "1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1,Comment#3", "1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1,Comment#3", "1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1,Comment#3"}));
        this.CumulateWindowCubeExpectedData = this.CumulateWindowGroupSetExpectedData();
        this.CumulateWindowRollupExpectedData = this.CumulateWindowGroupSetExpectedData();
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
    }
}

