/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.table.api.bridge.scala.StreamStatementSet;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase$;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005Mb\u0001B\n\u0015\u0001\u0015B\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!\f\u0005\u0006\u000b\u0002!\tA\u0012\u0005\u0006\u0015\u0002!\te\u0013\u0005\u0006;\u0002!\ta\u0013\u0005\u0006S\u0002!\ta\u0013\u0005\u0006W\u0002!\ta\u0013\u0005\u0006[\u0002!\ta\u0013\u0005\u0006_\u0002!\ta\u0013\u0005\u0006c\u0002!\ta\u0013\u0005\u0006g\u0002!\ta\u0013\u0005\u0006k\u0002!\ta\u0013\u0005\u0006o\u0002!\ta\u0013\u0005\u0006s\u0002!\ta\u0013\u0005\u0006w\u0002!\ta\u0013\u0005\u0006{\u0002!\ta\u0013\u0005\u0006\u007f\u0002!\ta\u0013\u0005\u0007\u0003\u0007\u0001A\u0011A&\t\r\u0005\u001d\u0001\u0001\"\u0001L\u0005=!\u0016M\u00197f'&t7.\u0013+DCN,'BA\u000b\u0017\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003/a\taa\u001d;sK\u0006l'BA\r\u001b\u0003\u001d\u0011XO\u001c;j[\u0016T!a\u0007\u000f\u0002\u000fAd\u0017M\u001c8fe*\u0011QDH\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003?\u0001\nQA\u001a7j].T!!\t\u0012\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0013aA8sO\u000e\u00011C\u0001\u0001'!\t9#&D\u0001)\u0015\tI\u0003$A\u0003vi&d7/\u0003\u0002,Q\tQ2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK\u0006!Qn\u001c3f!\tq#I\u0004\u00020\u0001:\u0011\u0001g\u0010\b\u0003cyr!AM\u001f\u000f\u0005MbdB\u0001\u001b<\u001d\t)$H\u0004\u00027s5\tqG\u0003\u00029I\u00051AH]8pizJ\u0011aI\u0005\u0003C\tJ!a\b\u0011\n\u0005uq\u0012BA\u000e\u001d\u0013\tI\"$\u0003\u0002*1%\u0011\u0011\tK\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003\u0007\u0012\u0013\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005\u0005C\u0013A\u0002\u001fj]&$h\b\u0006\u0002H\u0013B\u0011\u0001\nA\u0007\u0002)!)AF\u0001a\u0001[\u00051!-\u001a4pe\u0016$\u0012\u0001\u0014\t\u0003\u001bBk\u0011A\u0014\u0006\u0002\u001f\u0006)1oY1mC&\u0011\u0011K\u0014\u0002\u0005+:LG\u000f\u000b\u0002\u0004'B\u0011AkW\u0007\u0002+*\u0011akV\u0001\u0004CBL'B\u0001-Z\u0003\u001dQW\u000f]5uKJT!A\u0017\u0012\u0002\u000b),h.\u001b;\n\u0005q+&A\u0003\"fM>\u0014X-R1dQ\u0006IB/Z:u\u0015>Lg\u000eR5t_J$WM]\"iC:<W\rT8hQ\t!q\f\u0005\u0002UA&\u0011\u0011-\u0016\u0002\r)\u0016\u001cH\u000fV3na2\fG/\u001a\u0015\u0005\t\r4w\r\u0005\u0002UI&\u0011Q-\u0016\u0002\t\t&\u001c\u0018M\u00197fI\u0006)a/\u00197vK\u0006\n\u0001.A\u0006G\u0019&s5*L\u001a7cY2\u0014!\u0007;fgR\u001c\u0016N\\6ESN|'\u000fZ3s\u0007\"\fgnZ3M_\u001eD#!B0\u0002CQ,7\u000f^*j].$\u0015n]8sI\u0016\u00148\t[1oO\u0016dunZ,ji\"\u0014\u0016M\\6)\u0005\u0019y\u0016A\u0010;fgR\u001c\u0005.\u00198hK2|wmU8ve\u000e,w+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e4UO\\2TS:\\w+\u001b;i\t&4g-\u001a:f]R\u00046\u000e\u000b\u0002\b?\u0006!B/Z:u\u0013:\u001cXM\u001d;QCJ$8i\u001c7v[:D#\u0001C0\u0002/Q,7\u000f^\"sK\u0006$X\rV1cY\u0016\f5oU3mK\u000e$\bFA\u0005`\u0003\u0011\"Xm\u001d;De\u0016\fG/\u001a+bE2,\u0017i]*fY\u0016\u001cGoV5uQN{'\u000f\u001e'j[&$\bF\u0001\u0006`\u0003\u0015\"Xm\u001d;De\u0016\fG/\u001a+bE2,\u0017i]*fY\u0016\u001cGoV5uQ>,Ho\u00149uS>t7\u000f\u000b\u0002\f?\u0006IC/Z:u\u0007J,\u0017\r^3UC\ndW-Q:TK2,7\r^,ji\"\u001cu\u000e\\;n]>\u0013H-\u001a:j]\u001eD#\u0001D0\u0002SQ,7\u000f^\"sK\u0006$X\rV1cY\u0016\f5oU3mK\u000e$x+\u001b;i\u001d\u0016<8i\u001c7v[:\u001cxJ\u001c7zQ\tiq,A\u0018uKN$8I]3bi\u0016$\u0016M\u00197f\u0003N\u001cV\r\\3di^KG\u000f[\"pYVlgn\u001d$s_6\fV/\u001a:z\u001f:d\u0017\u0010\u000b\u0002\u000f?\u0006ID/Z:u\u0007J,\u0017\r^3UC\ndW-Q:TK2,7\r^,ji\"l\u0015\u000e_(g\u001d\u0016<8i\u001c7v[:\u001c\u0018I\u001c3Rk\u0016\u0014\u0018pQ8mk6t7\u000f\u000b\u0002\u0010?\u0006\tB/Z:u!\u0006\u0014H/[1m\u0013:\u001cXM\u001d;)\u0005Ay\u0016!\u0005;fgRLen]3si^KG\u000f[\"U\u000b\"\u0012\u0011cX\u0001 i\u0016\u001cH/\u00169tKJ$8+\u001b8l/&$\bNR1jY&twmU8ve\u000e,\u0007F\u0001\n`Q\u0019\u0001\u0011Q\u00024\u0002\u001aA!\u0011qBA\u000b\u001b\t\t\tBC\u0002\u0002\u0014U\u000b\u0011\"\u001a=uK:\u001c\u0018n\u001c8\n\t\u0005]\u0011\u0011\u0003\u0002\u000b\u000bb$XM\u001c3XSRDGFAA\u000eG\t\ti\u0002\u0005\u0003\u0002 \u0005=RBAA\u0011\u0015\u0011\t\u0019#!\n\u0002\u001bA\f'/Y7fi\u0016\u0014\u0018N_3e\u0015\u0011\t9#!\u000b\u0002\u0015\u0015DH/\u001a8tS>t7OC\u0002[\u0003WQ1!!\f\u001f\u0003%!Xm\u001d;vi&d7/\u0003\u0003\u00022\u0005\u0005\"A\u0007)be\u0006lW\r^3sSj,G\rV3ti\u0016CH/\u001a8tS>t\u0007")
public class TableSinkITCase
extends StreamingWithStateTestBase {
    @Override
    @BeforeEach
    public void before() {
        super.before();
        String srcDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), (List)Nil$.MODULE$))))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n                       |CREATE TABLE src (person String, votes BIGINT) WITH(\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(srcDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String awardDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)5.2)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)12.1)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)18.3)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)22.5)})), (List)Nil$.MODULE$))))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(183).append("\n         |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(awardDataId).append("'\n         |)\n         |").toString())).stripMargin());
        String peopleDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToInteger((int)22)})), (List)Nil$.MODULE$)));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(181).append("\n         |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(peopleDataId).append("'\n         |)\n         |").toString())).stripMargin());
        String userDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.userChangelog());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(511).append("\n                       |CREATE TABLE users (\n                       |  user_id STRING,\n                       |  user_name STRING,\n                       |  email STRING,\n                       |  balance DECIMAL(18,2),\n                       |  primary key (user_id) not enforced\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(userDataId).append("',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    @Disabled(value="FLINK-36166")
    @TestTemplate
    public void testJoinDisorderChangeLog() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE JoinDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE, age INT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO JoinDisorderChangeLog\n                    |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM\n                    | (SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T,\n                    |   award\n                    |   WHERE T.sum_votes = award.votes) T1, people T2\n                    | WHERE T1.person = T2.person\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("JoinDisorderChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4, 22.5, 22]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLog() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO SinkDisorderChangeLog\n                    |SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award\n                    |   WHERE T.sum_votes = award.votes\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("SinkDisorderChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4, 22.5]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLogWithRank() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkRankChangeLog (\n                      |  person STRING, votes BIGINT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n          |INSERT INTO SinkRankChangeLog\n          |SELECT person, sum_votes FROM\n          | (SELECT person, sum_votes,\n          |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number\n          |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src\n          |      GROUP BY person))\n          |   WHERE rank_number < 10\n          |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("SinkRankChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testChangelogSourceWithNonDeterministicFuncSinkWithDifferentPk() {
        this.tEnv().createTemporaryFunction("ndFunc", (UserDefinedFunction)new TestNonDeterministicUdf());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE sink_with_pk (\n                      |  user_id STRING,\n                      |  user_name STRING,\n                      |  email STRING,\n                      |  balance DECIMAL(18,2),\n                      |  PRIMARY KEY(email) NOT ENFORCED\n                      |) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                     |insert into sink_with_pk\n                     |select user_id, SPLIT_INDEX(ndFunc(user_name), '-', 0), email, balance\n                     |from users\n                     |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("sink_with_pk");
        .colon.colon expected = new .colon.colon((Object)"+I[user1, Tom, tom123@gmail.com, 8.10]", (List)new .colon.colon((Object)"+I[user3, Bailey, bailey@qq.com, 9.99]", (List)new .colon.colon((Object)"+I[user4, Tina, tina@gmail.com, 11.30]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        java.util.List<String> rawResult = TestValuesTableFactory.getRawResultsAsStrings("sink_with_pk");
        List expectedRaw = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"+I[user1, Tom, tom@gmail.com, 10.02]", "+I[user2, Jack, jack@hotmail.com, 71.20]", "-D[user1, Tom, tom@gmail.com, 10.02]", "+I[user1, Tom, tom123@gmail.com, 8.10]", "+I[user3, Bailey, bailey@gmail.com, 9.99]", "-D[user2, Jack, jack@hotmail.com, 71.20]", "+I[user4, Tina, tina@gmail.com, 11.30]", "-D[user3, Bailey, bailey@gmail.com, 9.99]", "+I[user3, Bailey, bailey@qq.com, 9.99]"}));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(rawResult).toList()).isEqualTo((Object)expectedRaw);
    }

    @TestTemplate
    public void testInsertPartColumn() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE zm_test (\n                      |  `person` String,\n                      |  `votes` BIGINT,\n                      |  `m1` MAP<STRING, BIGINT>,\n                      |  `m2` MAP<STRING NOT NULL, BIGINT>,\n                      |  `m3` MAP<STRING, BIGINT NOT NULL>,\n                      |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'true'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into zm_test(`person`, `votes`)\n                    |  select\n                    |    `person`,\n                    |    `votes`\n                    |  from\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("zm_test");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelect() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyCtasTableUseStatement\n                                | WITH (\n                                |   'connector' = 'values',\n                                |   'sink-insert-only' = 'true'\n                                |) AS\n                                |  SELECT\n                                |    `person`,\n                                |    `votes`\n                                |  FROM\n                                |    src\n                                |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithSortLimit() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'false'\n                    |) AS\n                    |  (SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src order by `votes` LIMIT 2)\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithoutOptions() {
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("CREATE TABLE MyCtasTable AS SELECT `person`, `votes` FROM src")).hasRootCauseMessage("Table options do not contain an option key 'connector' for discovering a connector.");
    }

    @TestTemplate
    public void testCreateTableAsSelectWithColumnOrdering() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable(votes, person)\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[1, jason]", (List)new .colon.colon((Object)"+I[1, jason]", (List)new .colon.colon((Object)"+I[1, jason]", (List)new .colon.colon((Object)"+I[1, jason]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyCtasTableUseStatement(votes, person)\n                                | WITH (\n                                |   'connector' = 'values',\n                                |   'sink-insert-only' = 'true'\n                                |) AS\n                                |  SELECT\n                                |    `person`,\n                                |    `votes`\n                                |  FROM\n                                |    src\n                                |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithNewColumnsOnly() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable(`p1` INT, `p2` STRING)\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[null, null, jason, 1]", (List)new .colon.colon((Object)"+I[null, null, jason, 1]", (List)new .colon.colon((Object)"+I[null, null, jason, 1]", (List)new .colon.colon((Object)"+I[null, null, jason, 1]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyCtasTableUseStatement(`p1` INT, `p2` STRING)\n                                | WITH (\n                                |   'connector' = 'values',\n                                |   'sink-insert-only' = 'true'\n                                |) AS\n                                |  SELECT\n                                |    `person`,\n                                |    `votes`\n                                |  FROM\n                                |    src\n                                |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithColumnsFromQueryOnly() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable(`person` STRING, `votes` DOUBLE)\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1.0]", (List)new .colon.colon((Object)"+I[jason, 1.0]", (List)new .colon.colon((Object)"+I[jason, 1.0]", (List)new .colon.colon((Object)"+I[jason, 1.0]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE MyCtasTableUseStatement(`person` STRING, `votes` DOUBLE)\n        | WITH (\n        |   'connector' = 'values',\n        |   'sink-insert-only' = 'true'\n        |) AS\n        |  SELECT\n        |    `person`,\n        |    `votes`\n        |  FROM\n        |    src\n        |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithMixOfNewColumnsAndQueryColumns() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable(`p1` INT, `votes` DOUBLE, `votes_2x` AS `votes` * 2)\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[null, jason, 1.0]", (List)new .colon.colon((Object)"+I[null, jason, 1.0]", (List)new .colon.colon((Object)"+I[null, jason, 1.0]", (List)new .colon.colon((Object)"+I[null, jason, 1.0]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE MyCtasTableUseStatement(`p1` INT, `votes` DOUBLE, `votes_2x` AS `votes` * 2)\n        | WITH (\n        |   'connector' = 'values',\n        |   'sink-insert-only' = 'true'\n        |) AS\n        |  SELECT\n        |    `person`,\n        |    `votes`\n        |  FROM\n        |    src\n        |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testPartialInsert() {
        String srcDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "jason", BoxesRunTime.boxToLong((long)3L), "X", BoxesRunTime.boxToInteger((int)43)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "andy", BoxesRunTime.boxToLong((long)2L), "Y", BoxesRunTime.boxToInteger((int)32)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "clark", BoxesRunTime.boxToLong((long)1L), "Z", BoxesRunTime.boxToInteger((int)29)})), (List)Nil$.MODULE$)))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(416).append("\n                       |CREATE TABLE test_source (\n                       |  id bigint,\n                       |  person String,\n                       |  votes bigint,\n                       |  city String,\n                       |  age int)\n                       |WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(srcDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE test_sink (\n                      |  id bigint,\n                      |  person String,\n                      |  votes bigint,\n                      |  city String,\n                      |  age int,\n                      |  primary key(id) not enforced\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, person, votes)\n                    |  select\n                    |    id,\n                    |    person,\n                    |    votes\n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("test_sink");
        .colon.colon expected = new .colon.colon((Object)"+I[1, jason, 3, null, null]", (List)new .colon.colon((Object)"+I[2, andy, 2, null, null]", (List)new .colon.colon((Object)"+I[3, clark, 1, null, null]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, city, age)\n                    |  select\n                    |    id,\n                    |    city,\n                    |    age \n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        java.util.List<String> result2 = TestValuesTableFactory.getResultsAsStrings("test_sink");
        .colon.colon expected2 = new .colon.colon((Object)"+I[1, jason, 3, X, 43]", (List)new .colon.colon((Object)"+I[2, andy, 2, Y, 32]", (List)new .colon.colon((Object)"+I[3, clark, 1, Z, 29]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result2).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected2.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testInsertWithCTE() {
        String srcDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "jason", BoxesRunTime.boxToLong((long)3L), "X", BoxesRunTime.boxToInteger((int)43)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "andy", BoxesRunTime.boxToLong((long)2L), "Y", BoxesRunTime.boxToInteger((int)32)})), (List)new .colon.colon((Object)BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "clark", BoxesRunTime.boxToLong((long)1L), "Z", BoxesRunTime.boxToInteger((int)29)})), (List)Nil$.MODULE$)))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(416).append("\n                       |CREATE TABLE test_source (\n                       |  id bigint,\n                       |  person String,\n                       |  votes bigint,\n                       |  city String,\n                       |  age int)\n                       |WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(srcDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE test_sink (\n                    |  id bigint,\n                    |  person String,\n                    |  votes bigint,\n                    |  city String,\n                    |  age int,\n                    |  primary key(id) not enforced\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'sink-insert-only' = 'false'\n                    |)\n                    |")).stripMargin()).await();
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO test_sink (id, person, votes)\n                    |  WITH cte AS (SELECT\n                    |    id,\n                    |    person,\n                    |    votes\n                    |  FROM\n                    |    test_source) SELECT * FROM cte\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("test_sink");
        .colon.colon expected = new .colon.colon((Object)"+I[1, jason, 3, null, null]", (List)new .colon.colon((Object)"+I[2, andy, 2, null, null]", (List)new .colon.colon((Object)"+I[3, clark, 1, null, null]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testUpsertSinkWithFailingSource() {
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixeddelay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)BoxesRunTime.boxToInteger((int)1));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        this.env().configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        FailingCollectionSource.reset();
        List data = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{TestValuesTableFactory.changelogRow("+I", BoxesRunTime.boxToInteger((int)1), "Jim"), TestValuesTableFactory.changelogRow("-U", BoxesRunTime.boxToInteger((int)1), "Jim"), TestValuesTableFactory.changelogRow("+U", BoxesRunTime.boxToInteger((int)1), "Ketty"), TestValuesTableFactory.changelogRow("+I", BoxesRunTime.boxToInteger((int)2), "Lilith"), TestValuesTableFactory.changelogRow("-U", BoxesRunTime.boxToInteger((int)2), "Lilith"), TestValuesTableFactory.changelogRow("+I", BoxesRunTime.boxToInteger((int)3), "Sam"), TestValuesTableFactory.changelogRow("-U", BoxesRunTime.boxToInteger((int)3), "Sam"), TestValuesTableFactory.changelogRow("+U", BoxesRunTime.boxToInteger((int)3), "Boob"), TestValuesTableFactory.changelogRow("-D", BoxesRunTime.boxToInteger((int)3), "Boob"), TestValuesTableFactory.changelogRow("+I", BoxesRunTime.boxToInteger((int)4), "Julia")}));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(429).append("\n                       |CREATE TABLE pk_src (\n                       |  id int primary key not enforced,\n                       |  name string\n                       |) with (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'failing-source' = 'true',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>)data)).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE pk_snk (\n                       |  id int primary key not enforced,\n                       |  name string\n                       |) with (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'sink-changelog-mode-enforced' = 'I,UA,D'\n                       |)\n                       |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO pk_snk SELECT * FROM pk_src where name <> 'unknown';\n                    |")).stripMargin()).await();
        .colon.colon expected = new .colon.colon((Object)"+I[1, Ketty]", (List)new .colon.colon((Object)"+I[4, Julia]", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("pk_snk")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    public TableSinkITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

