/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.utils.TestSinkUtil$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.StringContext;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005}a\u0001\u0002\u000e\u001c\u00011BQa\r\u0001\u0005\u0002QBqa\u000e\u0001C\u0002\u0013%\u0001\b\u0003\u0004=\u0001\u0001\u0006I!\u000f\u0005\b{\u0001\u0011\r\u0011\"\u0001?\u0011\u0019)\u0005\u0001)A\u0005\u007f!9a\t\u0001b\u0001\n\u0003q\u0004BB$\u0001A\u0003%q\bC\u0004I\u0001\t\u0007I\u0011\u0001 \t\r%\u0003\u0001\u0015!\u0003@\u0011\u0015Q\u0005\u0001\"\u0001L\u0011\u0015i\u0006\u0001\"\u0001L\u0011\u0015\u0011\u0007\u0001\"\u0001L\u0011\u0015!\u0007\u0001\"\u0001L\u0011\u00151\u0007\u0001\"\u0001L\u0011\u0015A\u0007\u0001\"\u0001L\u0011\u0015Q\u0007\u0001\"\u0001L\u0011\u0015a\u0007\u0001\"\u0001L\u0011\u0015q\u0007\u0001\"\u0001L\u0011\u0015\u0001\b\u0001\"\u0001L\u0011\u0015\u0011\b\u0001\"\u0001L\u0011\u0015!\b\u0001\"\u0001L\u0011\u00151\b\u0001\"\u0001L\u0011\u0015A\b\u0001\"\u0001L\u0011\u0015Q\b\u0001\"\u0001L\u0011\u0015a\b\u0001\"\u0003~\u0005ii\u0015N\\5CCR\u001c\u0007.\u00138uKJ4\u0018\r\\%oM\u0016\u0014H+Z:u\u0015\taR$A\u0002tc2T!AH\u0010\u0002\rM$(/Z1n\u0015\t\u0001\u0013%\u0001\u0003qY\u0006t'B\u0001\u0012$\u0003\u001d\u0001H.\u00198oKJT!\u0001J\u0013\u0002\u000bQ\f'\r\\3\u000b\u0005\u0019:\u0013!\u00024mS:\\'B\u0001\u0015*\u0003\u0019\t\u0007/Y2iK*\t!&A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001[A\u0011a&M\u0007\u0002_)\u0011\u0001'I\u0001\u0006kRLGn]\u0005\u0003e=\u0012Q\u0002V1cY\u0016$Vm\u001d;CCN,\u0017A\u0002\u001fj]&$h\bF\u00016!\t1\u0004!D\u0001\u001c\u0003\u0011)H/\u001b7\u0016\u0003e\u0002\"A\f\u001e\n\u0005mz#aE*ue\u0016\fW\u000eV1cY\u0016$Vm\u001d;Vi&d\u0017!B;uS2\u0004\u0013AB*U%&su)F\u0001@!\t\u00015)D\u0001B\u0015\t\u00115%A\u0003usB,7/\u0003\u0002E\u0003\nAA)\u0019;b)f\u0004X-A\u0004T)JKej\u0012\u0011\u0002\t1{ejR\u0001\u0006\u0019>su\tI\u0001\u0004\u0013:#\u0016\u0001B%O)\u0002\nQa]3ukB$\u0012\u0001\u0014\t\u0003\u001bBk\u0011A\u0014\u0006\u0002\u001f\u0006)1oY1mC&\u0011\u0011K\u0014\u0002\u0005+:LG\u000f\u000b\u0002\u000b'B\u0011AkW\u0007\u0002+*\u0011akV\u0001\u0004CBL'B\u0001-Z\u0003\u001dQW\u000f]5uKJT!AW\u0015\u0002\u000b),h.\u001b;\n\u0005q+&A\u0003\"fM>\u0014X-R1dQ\u0006\tB/Z:u\u001b&t\u0017NQ1uG\"|e\u000e\\=)\u0005-y\u0006C\u0001+a\u0013\t\tWK\u0001\u0003UKN$\u0018A\b;fgRl\u0015N\\5CCR\u001c\u0007n\u00148ms\u001a{'\u000fR1uCN#(/Z1nQ\taq,\u0001\u0011uKN$(+\u001a3v]\u0012\fg\u000e^,bi\u0016\u0014X.\u0019:l\t\u00164\u0017N\\5uS>t\u0007FA\u0007`\u0003]!Xm\u001d;XS:$wn^,ji\",\u0015M\u001d7z\r&\u0014X\r\u000b\u0002\u000f?\u0006\tB/Z:u/&tGm\\<DCN\u001c\u0017\rZ3)\u0005=y\u0016!\b;fgRLe\u000e^3sm\u0006d'j\\5o/&$\b.T5oS\n\u000bGo\u00195)\u0005Ay\u0016\u0001\t;fgR\u0014vn\u001e;j[\u0016\u0014vn^:Pm\u0016\u0014x+\u001b;i\u001b&t\u0017NQ1uG\"D#!E0\u0002UQ,7\u000f\u001e+f[B|'/\u00197UC\ndWMR;oGRLwN\u001c&pS:<\u0016\u000e\u001e5NS:L')\u0019;dQ\"\u0012!cX\u0001!i\u0016\u001cH/T;mi&|\u0005/\u001a:bi>\u0014h*Z3eg^\u000bG/\u001a:nCJ\\\u0017\u0007\u000b\u0002\u0014?\u0006\u0001C/Z:u\u001bVdG/[(qKJ\fGo\u001c:OK\u0016$7oV1uKJl\u0017M]63Q\t!r,\u0001\u0011uKN$X*\u001e7uS>\u0003XM]1u_JtU-\u001a3t/\u0006$XM]7be.\u001c\u0004FA\u000b`\u0003q!Xm\u001d;Nk2$\u0018\u000e\u001d7f/&tGm\\<BO\u001e\u0014XmZ1uKND#AF0\u0002IQ,7\u000f^'j]&\u0014\u0015\r^2i\u001f:$\u0015\r^1TiJ,\u0017-\\,ji\"\u0014vn\u001e+j[\u0016D#aF0\u0002]Q,7\u000f^(wKJ<\u0016N\u001c3po6Kg.\u001b\"bi\u000eDwJ\u001c#bi\u0006\u001cFO]3b[^KG\u000f\u001b*poRKW.\u001a\u0015\u00031}\u000b!c^5uQ\u0016\u000b'\u000f\\=GSJ,G)\u001a7bsR!AJ`A\u0006\u0011\u0019y\u0018\u00041\u0001\u0002\u0002\u0005YA/\u00192mK\u000e{gNZ5h!\u0011\t\u0019!a\u0002\u000e\u0005\u0005\u0015!B\u0001,$\u0013\u0011\tI!!\u0002\u0003\u0017Q\u000b'\r\\3D_:4\u0017n\u001a\u0005\b\u0003\u001bI\u0002\u0019AA\b\u0003!Ig\u000e^3sm\u0006d\u0007\u0003BA\t\u00037i!!a\u0005\u000b\t\u0005U\u0011qC\u0001\u0005i&lWM\u0003\u0002\u0002\u001a\u0005!!.\u0019<b\u0013\u0011\ti\"a\u0005\u0003\u0011\u0011+(/\u0019;j_:\u0004")
public class MiniBatchIntervalInferTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());
    private final DataType STRING = DataTypes.STRING();
    private final DataType LONG = DataTypes.BIGINT();
    private final DataType INT = DataTypes.INT();

    private StreamTableTestUtil util() {
        return this.util;
    }

    public DataType STRING() {
        return this.STRING;
    }

    public DataType LONG() {
        return this.LONG;
    }

    public DataType INT() {
        return this.INT;
    }

    @BeforeEach
    public void setup() {
        this.util().addDataStream("MyDataStream1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyDataStream2", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$3 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE MyTable1 (\n                     |  `a` INT,\n                     |  `b` STRING,\n                     |  `c` BIGINT,\n                     |  proctime AS PROCTIME(),\n                     |  rowtime TIMESTAMP(3)\n                     |) WITH (\n                     |  'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE wmTable1 (\n                     |  WATERMARK FOR rowtime AS rowtime\n                     |) LIKE MyTable1 (INCLUDING ALL)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE wmTable2 (\n                     |  WATERMARK FOR rowtime AS rowtime\n                     |) LIKE MyTable1 (INCLUDING ALL)\n                     |")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
    }

    @Test
    public void testMiniBatchOnly() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMiniBatchOnlyForDataStream() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRedundantWatermarkDefinition() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testWindowWithEarlyFire() {
        TableConfig tableConfig = this.util().tableEnv().getConfig();
        tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        this.withEarlyFireDelay(tableConfig, Duration.ofMillis(500L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,\n        |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end\n        |   FROM wmTable1\n        |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        | )\n        | GROUP BY b\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testWindowCascade() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(3L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b,\n        |   SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt\n        |   FROM wmTable1\n        |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        | )\n        | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testIntervalJoinWithMiniBatch() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b\n        |   FROM\n        |     wmTable1 as t1 JOIN wmTable2 as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowtimeRowsOverWithMiniBatch() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(c)\n        | FROM (\n        |   SELECT c, COUNT(a)\n        |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM wmTable1\n        | )\n        | GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTemporalTableFunctionJoinWithMiniBatch() {
        this.util().addTableWithWatermark("Orders", this.util().tableEnv().from("MyDataStream1"), "rowtime", 0L);
        this.util().addTableWithWatermark("RatesHistory", this.util().tableEnv().from("MyDataStream2"), "rowtime", 0L);
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        this.util().addTemporarySystemFunction("Rates", (UserDefinedFunction)this.util().tableEnv().from("RatesHistory").createTemporalTableFunction(package$.MODULE$.FieldExpression(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"rowtime"}))).$((Seq)Nil$.MODULE$), package$.MODULE$.FieldExpression(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b"}))).$((Seq)Nil$.MODULE$)));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT r_a, COUNT(o_a)\n        | FROM (\n        |   SELECT o.a as o_a, r.a as r_a\n        |   FROM Orders As o,\n        |   LATERAL TABLE (Rates(o.rowtime)) as r\n        |   WHERE o.b = r.b\n        | )\n        | GROUP BY r_a\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testMultiOperatorNeedsWatermark1() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |   b, COUNT(a),\n        |   TUMBLE_START(rt, INTERVAL '5' SECOND),\n        |   TUMBLE_END(rt, INTERVAL '5' SECOND)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt\n        |   FROM\n        |     wmTable1 as t1 JOIN wmTable2 as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultiOperatorNeedsWatermark2() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(6L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)\n        | FROM (\n        |  SELECT t1.a as a, t1.b as b, t1.rt as rt\n        |  FROM\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt\n        |    FROM wmTable1\n        |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)\n        |  ) as t1\n        |  JOIN\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |    FROM wmTable2\n        |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |  ) as t2\n        |  ON\n        |    t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND\n        |    t2.rt + INTERVAL '10' SECOND\n        | )\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultiOperatorNeedsWatermark3() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(6L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |  SELECT t1.a, t1.b\n        |  FROM (\n        |    SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a\n        |  ) as t1\n        |  JOIN (\n        |    SELECT b, COUNT(a) as a\n        |    FROM (\n        |      SELECT b, COUNT(a) as a,\n        |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |      FROM wmTable1\n        |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |    )\n        |    GROUP BY b\n        |  ) as t2\n        |  ON t1.a = t2.a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultipleWindowAggregates() {
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE T1 (\n                     | id1 INT,\n                     | rowtime TIMESTAMP(3),\n                     | `text` STRING,\n                     | WATERMARK FOR rowtime AS rowtime\n                     |) WITH (\n                     | 'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE T2 (\n                     | id2 INT,\n                     | rowtime TIMESTAMP(3),\n                     | cnt INT,\n                     | name STRING,\n                     | goods STRING,\n                     | WATERMARK FOR rowtime AS rowtime\n                     |) WITH (\n                     | 'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofMillis(500L));
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)BoxesRunTime.boxToLong((long)300L));
        Table table1 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1, T1.rowtime AS ts, text\n                                          |  FROM T1, T2\n                                          |WHERE id1 = id2\n                                          |      AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE\n                                          |      AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE\n      ")).stripMargin());
        this.util().tableEnv().createTemporaryView("TempTable1", table1);
        Table table2 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    LISTAGG(text, '#') as text,\n                                          |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts\n                                          |FROM TempTable1\n                                          |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1\n      ")).stripMargin());
        this.util().tableEnv().createTemporaryView("TempTable2", table2);
        Table table3 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                               |SELECT id1,\n                               |    LISTAGG(text, '*')\n                               |FROM TempTable2\n                               |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1\n      ")).stripMargin());
        TestSinkUtil$.MODULE$.addValuesSink(this.util().tableEnv(), "appendSink1", (List<String>)new .colon.colon((Object)"a", (List)new .colon.colon((Object)"b", (List)Nil$.MODULE$)), (List<DataType>)new .colon.colon((Object)this.INT(), (List)new .colon.colon((Object)this.STRING(), (List)Nil$.MODULE$)), ChangelogMode.insertOnly(), TestSinkUtil$.MODULE$.addValuesSink$default$6());
        stmtSet.addInsert("appendSink1", table3);
        Table table4 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    LISTAGG(text, '-')\n                                          |FROM TempTable1\n                                          |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1\n      ")).stripMargin());
        TestSinkUtil$.MODULE$.addValuesSink(this.util().tableEnv(), "appendSink2", (List<String>)new .colon.colon((Object)"a", (List)new .colon.colon((Object)"b", (List)Nil$.MODULE$)), (List<DataType>)new .colon.colon((Object)this.INT(), (List)new .colon.colon((Object)this.STRING(), (List)Nil$.MODULE$)), ChangelogMode.insertOnly(), TestSinkUtil$.MODULE$.addValuesSink$default$6());
        stmtSet.addInsert("appendSink2", table4);
        Table table5 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    COUNT(text)\n                                          |FROM TempTable2\n                                          |GROUP BY id1\n      ")).stripMargin());
        TestSinkUtil$.MODULE$.addValuesSink(this.util().tableEnv(), "retractSink3", (List<String>)new .colon.colon((Object)"a", (List)new .colon.colon((Object)"b", (List)Nil$.MODULE$)), (List<DataType>)new .colon.colon((Object)this.INT(), (List)new .colon.colon((Object)this.LONG(), (List)Nil$.MODULE$)), ChangelogMode.all(), TestSinkUtil$.MODULE$.addValuesSink$default$6());
        stmtSet.addInsert("retractSink3", table5);
        this.util().verifyExplain(stmtSet);
    }

    @Test
    public void testMiniBatchOnDataStreamWithRowTime() {
        this.util().addDataStream("T1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "long")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "str")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$5 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT long,\n        |  COUNT(str) as cnt,\n        |  TUMBLE_END(rowtime, INTERVAL '10' SECOND) as rt\n        |FROM T1\n        |GROUP BY long, TUMBLE(rowtime, INTERVAL '10' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testOverWindowMiniBatchOnDataStreamWithRowTime() {
        this.util().addDataStream("T1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "long")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "str")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$7 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(3L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(`int`)\n        | FROM (\n        |   SELECT `int`,\n        |    COUNT(str) OVER\n        |      (PARTITION BY long ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM T1\n        | )\n        | GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    private void withEarlyFireDelay(TableConfig tableConfig, Duration interval) {
        long intervalInMillis = interval.toMillis();
        Duration earlyFireDelay = tableConfig.getOptional(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY()).orElse(null);
        if (earlyFireDelay != null && earlyFireDelay.toMillis() != intervalInMillis) {
            throw new RuntimeException("Currently not support different earlyFireInterval configs in one job");
        }
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY(), (Object)Duration.ofMillis(intervalInMillis));
    }
}

