/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.operations;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.error.SqlValidateException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.legacy.api.TableColumn;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.legacy.api.constraints.UniqueConstraint;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogCommentOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogResetOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.expressions.utils.Func1$;
import org.apache.flink.table.planner.expressions.utils.Func8$;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversionTestBase;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.OperationMatchers;
import org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

public class SqlDdlToOperationConverterTest
extends SqlNodeToOperationConversionTestBase {
    @Test
    public void testAlterCatalog() {
        String sql1 = "ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 'k2' = 'v2_new')";
        HashMap<String, String> expectedOptions = new HashMap<String, String>();
        expectedOptions.put("K1", "V1");
        expectedOptions.put("k2", "v2_new");
        Operation operation = this.parse("ALTER CATALOG cat2 SET ('K1' = 'V1', 'k2' = 'v2', 'k2' = 'v2_new')");
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Object)operation).isInstanceOf(AlterCatalogOptionsOperation.class)).asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogOptionsOperation.class))).extracting(new Function[]{AlterCatalogOptionsOperation::getCatalogName, AlterCatalogOptionsOperation::asSummaryString, AlterCatalogOptionsOperation::getProperties}).containsExactly(new Object[]{"cat2", "ALTER CATALOG cat2\n  SET 'K1' = 'V1',\n  SET 'k2' = 'v2_new'", expectedOptions});
        Set<String> expectedResetKeys = Collections.singleton("K1");
        operation = this.parse("ALTER CATALOG cat2 RESET ('K1')");
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Object)operation).isInstanceOf(AlterCatalogResetOperation.class)).asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogResetOperation.class))).extracting(new Function[]{AlterCatalogResetOperation::getCatalogName, AlterCatalogResetOperation::asSummaryString, AlterCatalogResetOperation::getResetKeys}).containsExactly(new Object[]{"cat2", "ALTER CATALOG cat2\n  RESET 'K1'", expectedResetKeys});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("ALTER CATALOG cat2 RESET ('type')")).isInstanceOf(ValidationException.class)).hasMessageContaining("ALTER CATALOG RESET does not support changing 'type'");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("ALTER CATALOG cat2 RESET ()")).isInstanceOf(ValidationException.class)).hasMessageContaining("ALTER CATALOG RESET does not support empty key");
        operation = this.parse("ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2'''");
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Object)operation).isInstanceOf(AlterCatalogCommentOperation.class)).asInstanceOf(InstanceOfAssertFactories.type(AlterCatalogCommentOperation.class))).extracting(new Function[]{AlterCatalogCommentOperation::getCatalogName, AlterCatalogCommentOperation::asSummaryString, AlterCatalogCommentOperation::getComment}).containsExactly(new Object[]{"cat2", "ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2'''", "comment for catalog 'cat2'"});
    }

    @Test
    public void testCreateDatabase() {
        String[] createDatabaseSqls = new String[]{"create database db1", "create database if not exists cat1.db1", "create database cat1.db1 comment 'db1_comment'", "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'K2' = 'V2')"};
        String[] expectedCatalogs = new String[]{"builtin", "cat1", "cat1", "cat1"};
        String expectedDatabase = "db1";
        String[] expectedComments = new String[]{null, null, "db1_comment", "db1_comment"};
        boolean[] expectedIgnoreIfExists = new boolean[]{false, true, false, false};
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("k1", "v1");
        properties.put("K2", "V2");
        Map[] expectedProperties = new Map[]{new HashMap(), new HashMap(), new HashMap(), new HashMap(properties)};
        for (int i = 0; i < createDatabaseSqls.length; ++i) {
            Operation operation = this.parse(createDatabaseSqls[i]);
            Assertions.assertThat((Object)operation).isInstanceOf(CreateDatabaseOperation.class);
            CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation)operation;
            Assertions.assertThat((String)createDatabaseOperation.getCatalogName()).isEqualTo(expectedCatalogs[i]);
            Assertions.assertThat((String)createDatabaseOperation.getDatabaseName()).isEqualTo("db1");
            Assertions.assertThat((String)createDatabaseOperation.getCatalogDatabase().getComment()).isEqualTo(expectedComments[i]);
            Assertions.assertThat((boolean)createDatabaseOperation.isIgnoreIfExists()).isEqualTo(expectedIgnoreIfExists[i]);
            Assertions.assertThat((Map)createDatabaseOperation.getCatalogDatabase().getProperties()).isEqualTo((Object)expectedProperties[i]);
        }
    }

    @Test
    public void testDropDatabase() {
        String[] dropDatabaseSqls = new String[]{"drop database db1", "drop database if exists db1", "drop database if exists cat1.db1 CASCADE", "drop database if exists cat1.db1 RESTRICT"};
        String[] expectedCatalogs = new String[]{"builtin", "builtin", "cat1", "cat1"};
        String expectedDatabase = "db1";
        boolean[] expectedIfExists = new boolean[]{false, true, true, true};
        boolean[] expectedIsCascades = new boolean[]{false, false, true, false};
        for (int i = 0; i < dropDatabaseSqls.length; ++i) {
            Operation operation = this.parse(dropDatabaseSqls[i]);
            Assertions.assertThat((Object)operation).isInstanceOf(DropDatabaseOperation.class);
            DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation)operation;
            Assertions.assertThat((String)dropDatabaseOperation.getCatalogName()).isEqualTo(expectedCatalogs[i]);
            Assertions.assertThat((String)dropDatabaseOperation.getDatabaseName()).isEqualTo("db1");
            Assertions.assertThat((boolean)dropDatabaseOperation.isIfExists()).isEqualTo(expectedIfExists[i]);
            Assertions.assertThat((boolean)dropDatabaseOperation.isCascade()).isEqualTo(expectedIsCascades[i]);
        }
    }

    @Test
    public void testAlterDatabase() throws Exception {
        this.catalogManager.registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("default", "default"));
        this.catalogManager.createDatabase("cat1", "db1", (CatalogDatabase)new CatalogDatabaseImpl(new HashMap(), "db1_comment"), true);
        String sql = "alter database cat1.db1 set ('k1'='v1', 'K2'='V2')";
        Operation operation = this.parse("alter database cat1.db1 set ('k1'='v1', 'K2'='V2')");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterDatabaseOperation.class);
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("k1", "v1");
        properties.put("K2", "V2");
        AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation)operation;
        Assertions.assertThat((String)alterDatabaseOperation.getDatabaseName()).isEqualTo("db1");
        Assertions.assertThat((String)alterDatabaseOperation.getCatalogName()).isEqualTo("cat1");
        Assertions.assertThat((String)alterDatabaseOperation.getCatalogDatabase().getComment()).isEqualTo("db1_comment");
        Assertions.assertThat((Map)alterDatabaseOperation.getCatalogDatabase().getProperties()).isEqualTo(properties);
    }

    @Test
    public void testCreateTable() {
        String sql = "CREATE TABLE tbl1 (\n  a bigint comment 'column a',\n  b varchar, \n  c int, \n  d varchar)\n  PARTITIONED BY (a, d)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        Operation operation = this.parse("CREATE TABLE tbl1 (\n  a bigint comment 'column a',\n  b varchar, \n  c int, \n  d varchar)\n  PARTITIONED BY (a, d)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n", planner, parser);
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        ResolvedCatalogTable catalogTable = op.getCatalogTable();
        Assertions.assertThat((List)catalogTable.getPartitionKeys()).hasSameElementsAs(Arrays.asList("a", "d"));
        Assertions.assertThat((Object[])catalogTable.getSchema().getFieldNames()).isEqualTo((Object)new String[]{"a", "b", "c", "d"});
        Assertions.assertThat((Object[])catalogTable.getSchema().getFieldDataTypes()).isEqualTo((Object)new DataType[]{DataTypes.BIGINT(), DataTypes.VARCHAR((int)Integer.MAX_VALUE), DataTypes.INT(), DataTypes.VARCHAR((int)Integer.MAX_VALUE)});
        catalogTable.getResolvedSchema().getColumn(0).ifPresent(column -> Assertions.assertThat((Optional)column.getComment()).isEqualTo(Optional.of("column a")));
    }

    @Test
    public void testCreateTableWithPrimaryKey() {
        String sql = "CREATE TABLE tbl1 (\n  a bigint,\n  b varchar, \n  c int, \n  d varchar, \n  constraint ct1 primary key(a, b) not enforced\n) with (\n  'connector' = 'kafka', \n  'kafka.topic' = 'log.test'\n)\n";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        Operation operation = this.parse("CREATE TABLE tbl1 (\n  a bigint,\n  b varchar, \n  c int, \n  d varchar, \n  constraint ct1 primary key(a, b) not enforced\n) with (\n  'connector' = 'kafka', \n  'kafka.topic' = 'log.test'\n)\n", planner, parser);
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        ResolvedCatalogTable catalogTable = op.getCatalogTable();
        TableSchema tableSchema = catalogTable.getSchema();
        Assertions.assertThat((String)tableSchema.getPrimaryKey().map(UniqueConstraint::asSummaryString).orElse("fakeVal")).isEqualTo("CONSTRAINT ct1 PRIMARY KEY (a, b)");
        Assertions.assertThat((Object[])tableSchema.getFieldNames()).isEqualTo((Object)new String[]{"a", "b", "c", "d"});
        Assertions.assertThat((Object[])tableSchema.getFieldDataTypes()).isEqualTo((Object)new DataType[]{(DataType)DataTypes.BIGINT().notNull(), (DataType)DataTypes.STRING().notNull(), DataTypes.INT(), DataTypes.STRING()});
    }

    @Test
    public void testPrimaryKeyOnGeneratedColumn() {
        String sql = "CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, c) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, c) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n")).isInstanceOf(ValidationException.class)).hasMessageContaining("Could not create a PRIMARY KEY with column 'c' at line 5, column 34.\nA PRIMARY KEY constraint must be declared on physical columns.");
    }

    @Test
    public void testPrimaryKeyNonExistentColumn() {
        String sql = "CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, d) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("CREATE TABLE tbl1 (\n  a bigint not null,\n  b varchar not null,\n  c as 2 * (a + 1),\n  constraint ct1 primary key (b, d) not enforced) with (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n")).isInstanceOf(ValidationException.class)).hasMessageContaining("Primary key column 'd' is not defined in the schema at line 5, column 34");
    }

    @Test
    public void testCreateTableWithMinusInOptionKey() {
        String sql = "create table source_table(\n  a int,\n  b bigint,\n  c varchar\n) with (\n  'a-B-c-d124' = 'Ab',\n  'a.b-c-d.e-f.g' = 'ada',\n  'a.b-c-d.e-f1231.g' = 'ada',\n  'a.b-c-d.*' = 'adad')\n";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        SqlNode node = parser.parse("create table source_table(\n  a int,\n  b bigint,\n  c varchar\n) with (\n  'a-B-c-d124' = 'Ab',\n  'a.b-c-d.e-f.g' = 'ada',\n  'a.b-c-d.e-f1231.g' = 'ada',\n  'a.b-c-d.*' = 'adad')\n");
        Assertions.assertThat((Object)node).isInstanceOf(SqlCreateTable.class);
        Operation operation = (Operation)SqlNodeToOperationConversion.convert((FlinkPlannerImpl)planner, (CatalogManager)this.catalogManager, (SqlNode)node).get();
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        ResolvedCatalogTable catalogTable = op.getCatalogTable();
        Map<String, String> options = catalogTable.getOptions().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        TreeMap<String, String> sortedProperties = new TreeMap<String, String>(options);
        String expected = "{a-B-c-d124=Ab, a.b-c-d.*=adad, a.b-c-d.e-f.g=ada, a.b-c-d.e-f1231.g=ada}";
        Assertions.assertThat((String)((Object)sortedProperties).toString()).isEqualTo("{a-B-c-d124=Ab, a.b-c-d.*=adad, a.b-c-d.e-f.g=ada, a.b-c-d.e-f1231.g=ada}");
    }

    @Test
    public void testCreateTableWithWatermark() throws FunctionAlreadyExistException, DatabaseNotExistException {
        CatalogFunctionImpl cf = new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName());
        this.catalog.createFunction(ObjectPath.fromString((String)"default.myfunc"), (CatalogFunction)cf, true);
        String sql = "create table source_table(\n  a int,\n  b bigint,\n  c timestamp(3),\n  watermark for `c` as myfunc(c, 1) - interval '5' second\n) with (\n  'connector.type' = 'kafka')\n";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        SqlNode node = parser.parse("create table source_table(\n  a int,\n  b bigint,\n  c timestamp(3),\n  watermark for `c` as myfunc(c, 1) - interval '5' second\n) with (\n  'connector.type' = 'kafka')\n");
        Assertions.assertThat((Object)node).isInstanceOf(SqlCreateTable.class);
        Operation operation = (Operation)SqlNodeToOperationConversion.convert((FlinkPlannerImpl)planner, (CatalogManager)this.catalogManager, (SqlNode)node).get();
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        ResolvedCatalogTable catalogTable = op.getCatalogTable();
        Map properties = catalogTable.toProperties();
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("schema.0.name", "a");
        expected.put("schema.0.data-type", "INT");
        expected.put("schema.1.name", "b");
        expected.put("schema.1.data-type", "BIGINT");
        expected.put("schema.2.name", "c");
        expected.put("schema.2.data-type", "TIMESTAMP(3)");
        expected.put("schema.watermark.0.rowtime", "c");
        expected.put("schema.watermark.0.strategy.expr", "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
        expected.put("connector.type", "kafka");
        Assertions.assertThat((Map)properties).isEqualTo(expected);
    }

    @Test
    public void testBasicCreateTableLike() {
        HashMap<String, String> sourceProperties = new HashMap<String, String>();
        sourceProperties.put("format.type", "json");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).build()).options(sourceProperties).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable";
        Operation operation = this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable");
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).column("a", (AbstractDataType)DataTypes.INT()).watermark("f1", "`f1` - INTERVAL '5' SECOND").build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")), OperationMatchers.partitionedBy("a", "f0"))));
    }

    @Test
    public void testCreateTableLikeWithFullPath() {
        HashMap<String, String> sourceProperties = new HashMap<String, String>();
        sourceProperties.put("connector.type", "kafka");
        sourceProperties.put("format.type", "json");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).build()).options(sourceProperties).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table mytable like `builtin`.`default`.sourceTable";
        Operation operation = this.parseAndConvert("create table mytable like `builtin`.`default`.sourceTable");
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")))));
    }

    @Test
    public void testMergingCreateTableLike() {
        HashMap<String, String> sourceProperties = new HashMap<String, String>();
        sourceProperties.put("format.type", "json");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).columnByExpression("f2", "`f0` + 12345").watermark("f1", "`f1` - interval '1' second").build()).partitionKeys(Arrays.asList("f0", "f1")).options(sourceProperties).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable (\n   EXCLUDING GENERATED\n   EXCLUDING PARTITIONS\n   OVERWRITING OPTIONS\n   OVERWRITING WATERMARKS)";
        Operation operation = this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nPARTITIONED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable (\n   EXCLUDING GENERATED\n   EXCLUDING PARTITIONS\n   OVERWRITING OPTIONS\n   OVERWRITING WATERMARKS)");
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).column("a", (AbstractDataType)DataTypes.INT()).watermark("f1", "`f1` - INTERVAL '5' SECOND").build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")), OperationMatchers.partitionedBy("a", "f0"))));
    }

    @Test
    public void testMergingCreateTableLikeExcludingDistribution() {
        HashMap<String, String> sourceProperties = new HashMap<String, String>();
        sourceProperties.put("format.type", "json");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).columnByExpression("f2", "`f0` + 12345").watermark("f1", "`f1` - interval '1' second").build()).distribution(TableDistribution.ofHash(Collections.singletonList("f0"), (Integer)3)).partitionKeys(Arrays.asList("f0", "f1")).options(sourceProperties).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nDISTRIBUTED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable (\n   EXCLUDING GENERATED\n   EXCLUDING DISTRIBUTION\n   EXCLUDING PARTITIONS\n   OVERWRITING OPTIONS\n   OVERWRITING WATERMARKS)";
        Operation operation = this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nDISTRIBUTED BY (a, f0)\nwith (\n  'connector.type' = 'kafka')\nlike sourceTable (\n   EXCLUDING GENERATED\n   EXCLUDING DISTRIBUTION\n   EXCLUDING PARTITIONS\n   OVERWRITING OPTIONS\n   OVERWRITING WATERMARKS)");
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withDistribution(TableDistribution.ofUnknown(Arrays.asList("a", "f0"), null)), OperationMatchers.withSchema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).column("a", (AbstractDataType)DataTypes.INT()).watermark("f1", "`f1` - INTERVAL '5' SECOND").build()), OperationMatchers.withOptions(OperationMatchers.entry("connector.type", "kafka"), OperationMatchers.entry("format.type", "json")))));
    }

    @Test
    public void testCreateTableValidDistribution() {
        String sql = "create table derivedTable(\n  a int\n)\nDISTRIBUTED BY (a)";
        Operation operation = this.parseAndConvert("create table derivedTable(\n  a int\n)\nDISTRIBUTED BY (a)");
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withDistribution(TableDistribution.ofUnknown(Collections.singletonList("a"), null)))));
    }

    @Test
    public void testCreateTableInvalidDistribution() {
        String sql = "create table derivedTable(\n  a int\n)\nDISTRIBUTED BY (f3)";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int\n)\nDISTRIBUTED BY (f3)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid bucket key 'f3'. A bucket key for a distribution must reference a physical column in the schema. Available columns are: [a]");
    }

    @Test
    public void tesCreateTableAsWithOrderingColumns() {
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).build()).options(Map.of("connector", TestSimpleDynamicTableSourceFactory.IDENTIFIER())).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"src1"), false);
        String sql = "create table tbl1 (f1, f0) AS SELECT * FROM src1";
        Operation ctas = this.parseAndConvert("create table tbl1 (f1, f0) AS SELECT * FROM src1");
        CreateTableOperation operation = ((CreateTableASOperation)ctas).getCreateTableOperation();
        Assertions.assertThat((Object)operation).is((Condition)new HamcrestCondition(OperationMatchers.isCreateTableOperation(OperationMatchers.withNoDistribution(), OperationMatchers.withSchema(Schema.newBuilder().column("f1", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).column("f0", DataTypes.INT().notNull()).build()))));
    }

    @Test
    public void testCreateTableInvalidPartition() {
        String sql = "create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Partition column 'f3' not defined in the table schema. Available columns: ['a']");
    }

    @Test
    public void testCreateTableLikeInvalidPartition() {
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build()).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)\nlike sourceTable";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int\n)\nPARTITIONED BY (f3)\nlike sourceTable")).isInstanceOf(ValidationException.class)).hasMessageContaining("Partition column 'f3' not defined in the table schema. Available columns: ['f0', 'a']");
    }

    @Test
    public void testCreateTableInvalidWatermark() {
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)")).isInstanceOf(ValidationException.class)).hasMessageContaining("The rowtime attribute field 'f1' is not defined in the table schema, at line 3, column 17\nAvailable fields: ['a']");
    }

    @Test
    public void testCreateTableLikeInvalidWatermark() {
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build()).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nlike sourceTable";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1 as `f1` - interval '5' second\n)\nlike sourceTable")).isInstanceOf(ValidationException.class)).hasMessageContaining("The rowtime attribute field 'f1' is not defined in the table schema, at line 3, column 17\nAvailable fields: ['f0', 'a']");
    }

    @Test
    public void testCreateTableLikeNestedWatermark() {
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", DataTypes.INT().notNull()).column("f1", (AbstractDataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"tmstmp", (DataType)DataTypes.TIMESTAMP((int)3))})).build()).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceTable"), false);
        String sql = "create table derivedTable(\n  a int,\n  watermark for f1.t as f1.t - interval '5' second\n)\nlike sourceTable";
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parseAndConvert("create table derivedTable(\n  a int,\n  watermark for f1.t as f1.t - interval '5' second\n)\nlike sourceTable")).isInstanceOf(ValidationException.class)).hasMessageContaining("The rowtime attribute field 'f1.t' is not defined in the table schema, at line 3, column 20\nNested field 't' was not found in a composite type: ROW<`tmstmp` TIMESTAMP(3)>.");
    }

    @Test
    public void testCreateTableWithFullDataTypes() {
        List<TestItem> testItems = Arrays.asList(SqlDdlToOperationConverterTest.createTestItem("CHAR", DataTypes.CHAR((int)1)), SqlDdlToOperationConverterTest.createTestItem("CHAR NOT NULL", DataTypes.CHAR((int)1).notNull()), SqlDdlToOperationConverterTest.createTestItem("CHAR NULL", DataTypes.CHAR((int)1)), SqlDdlToOperationConverterTest.createTestItem("CHAR(33)", DataTypes.CHAR((int)33)), SqlDdlToOperationConverterTest.createTestItem("VARCHAR", DataTypes.STRING()), SqlDdlToOperationConverterTest.createTestItem("VARCHAR(33)", DataTypes.VARCHAR((int)33)), SqlDdlToOperationConverterTest.createTestItem("STRING", DataTypes.STRING()), SqlDdlToOperationConverterTest.createTestItem("BOOLEAN", DataTypes.BOOLEAN()), SqlDdlToOperationConverterTest.createTestItem("BINARY", DataTypes.BINARY((int)1)), SqlDdlToOperationConverterTest.createTestItem("BINARY(33)", DataTypes.BINARY((int)33)), SqlDdlToOperationConverterTest.createTestItem("VARBINARY", DataTypes.BYTES()), SqlDdlToOperationConverterTest.createTestItem("VARBINARY(33)", DataTypes.VARBINARY((int)33)), SqlDdlToOperationConverterTest.createTestItem("BYTES", DataTypes.BYTES()), SqlDdlToOperationConverterTest.createTestItem("DECIMAL", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("DEC", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("NUMERIC", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("DECIMAL(10)", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("DEC(10)", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("NUMERIC(10)", DataTypes.DECIMAL((int)10, (int)0)), SqlDdlToOperationConverterTest.createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL((int)10, (int)3)), SqlDdlToOperationConverterTest.createTestItem("DEC(10, 3)", DataTypes.DECIMAL((int)10, (int)3)), SqlDdlToOperationConverterTest.createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL((int)10, (int)3)), SqlDdlToOperationConverterTest.createTestItem("TINYINT", DataTypes.TINYINT()), SqlDdlToOperationConverterTest.createTestItem("SMALLINT", DataTypes.SMALLINT()), SqlDdlToOperationConverterTest.createTestItem("INTEGER", DataTypes.INT()), SqlDdlToOperationConverterTest.createTestItem("INT", DataTypes.INT()), SqlDdlToOperationConverterTest.createTestItem("BIGINT", DataTypes.BIGINT()), SqlDdlToOperationConverterTest.createTestItem("FLOAT", DataTypes.FLOAT()), SqlDdlToOperationConverterTest.createTestItem("DOUBLE", DataTypes.DOUBLE()), SqlDdlToOperationConverterTest.createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()), SqlDdlToOperationConverterTest.createTestItem("DATE", DataTypes.DATE()), SqlDdlToOperationConverterTest.createTestItem("TIME", DataTypes.TIME()), SqlDdlToOperationConverterTest.createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()), SqlDdlToOperationConverterTest.createTestItem("TIME(3)", DataTypes.TIME()), SqlDdlToOperationConverterTest.createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP", DataTypes.TIMESTAMP((int)6)), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP((int)6)), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP((int)3)), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP((int)3)), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP WITH LOCAL TIME ZONE", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)6)), SqlDdlToOperationConverterTest.createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3)), SqlDdlToOperationConverterTest.createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>", DataTypes.ARRAY((DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE((int)3))), SqlDdlToOperationConverterTest.createTestItem("ARRAY<INT NOT NULL>", DataTypes.ARRAY((DataType)((DataType)DataTypes.INT().notNull()))), SqlDdlToOperationConverterTest.createTestItem("INT ARRAY", DataTypes.ARRAY((DataType)DataTypes.INT())), SqlDdlToOperationConverterTest.createTestItem("INT NOT NULL ARRAY", DataTypes.ARRAY((DataType)((DataType)DataTypes.INT().notNull()))), SqlDdlToOperationConverterTest.createTestItem("INT ARRAY NOT NULL", DataTypes.ARRAY((DataType)DataTypes.INT()).notNull()), SqlDdlToOperationConverterTest.createTestItem("MULTISET<INT NOT NULL>", DataTypes.MULTISET((DataType)((DataType)DataTypes.INT().notNull()))), SqlDdlToOperationConverterTest.createTestItem("INT MULTISET", DataTypes.MULTISET((DataType)DataTypes.INT())), SqlDdlToOperationConverterTest.createTestItem("INT NOT NULL MULTISET", DataTypes.MULTISET((DataType)((DataType)DataTypes.INT().notNull()))), SqlDdlToOperationConverterTest.createTestItem("INT MULTISET NOT NULL", DataTypes.MULTISET((DataType)DataTypes.INT()).notNull()), SqlDdlToOperationConverterTest.createTestItem("MAP<BIGINT, BOOLEAN>", DataTypes.MAP((DataType)DataTypes.BIGINT(), (DataType)DataTypes.BOOLEAN())), SqlDdlToOperationConverterTest.createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())})), SqlDdlToOperationConverterTest.createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())})), SqlDdlToOperationConverterTest.createTestItem("ROW<`f0` INT>", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT())})), SqlDdlToOperationConverterTest.createTestItem("ROW(`f0` INT)", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT())})), SqlDdlToOperationConverterTest.createTestItem("ROW<>", DataTypes.ROW()), SqlDdlToOperationConverterTest.createTestItem("ROW()", DataTypes.ROW()), SqlDdlToOperationConverterTest.createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', f1 BOOLEAN 'This as well.'>", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())})), SqlDdlToOperationConverterTest.createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>", DataTypes.ARRAY((DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())}))), SqlDdlToOperationConverterTest.createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET", DataTypes.MULTISET((DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())}))), SqlDdlToOperationConverterTest.createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>", DataTypes.MULTISET((DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.BOOLEAN())}))), SqlDdlToOperationConverterTest.createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, f1 INT ARRAY, f2 BOOLEAN MULTISET>", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f00", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f01", (DataType)DataTypes.BOOLEAN())})), DataTypes.FIELD((String)"f1", (DataType)DataTypes.ARRAY((DataType)DataTypes.INT())), DataTypes.FIELD((String)"f2", (DataType)DataTypes.MULTISET((DataType)DataTypes.BOOLEAN()))})));
        StringBuilder buffer = new StringBuilder("create table t1(\n");
        for (int i = 0; i < testItems.size(); ++i) {
            buffer.append("f").append(i).append(" ").append(testItems.get((int)i).testExpr);
            if (i == testItems.size() - 1) {
                buffer.append(")");
                continue;
            }
            buffer.append(",\n");
        }
        String sql = buffer.toString();
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        SqlNode node = parser.parse(sql);
        Assertions.assertThat((Object)node).isInstanceOf(SqlCreateTable.class);
        Operation operation = (Operation)SqlNodeToOperationConversion.convert((FlinkPlannerImpl)planner, (CatalogManager)this.catalogManager, (SqlNode)node).get();
        TableSchema schema = ((CreateTableOperation)operation).getCatalogTable().getSchema();
        Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
        Assertions.assertThat((Object[])schema.getFieldDataTypes()).isEqualTo((Object)expectedDataTypes);
    }

    @Test
    public void testCreateTableWithComputedColumn() {
        String sql = "CREATE TABLE tbl1 (\n  a int,\n  b varchar, \n  c as a - 1, \n  d as b || '$$', \n  e as my_udf1(a),  f as `default`.my_udf2(a) + 1,  g as builtin.`default`.my_udf3(a) || '##'\n)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n";
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of((String)"builtin", (String)"default", (String)"my_udf1"), (ScalarFunction)Func0$.MODULE$);
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of((String)"builtin", (String)"default", (String)"my_udf2"), (ScalarFunction)Func1$.MODULE$);
        this.functionCatalog.registerTempCatalogScalarFunction(ObjectIdentifier.of((String)"builtin", (String)"default", (String)"my_udf3"), (ScalarFunction)Func8$.MODULE$);
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        Operation operation = this.parse("CREATE TABLE tbl1 (\n  a int,\n  b varchar, \n  c as a - 1, \n  d as b || '$$', \n  e as my_udf1(a),  f as `default`.my_udf2(a) + 1,  g as builtin.`default`.my_udf3(a) || '##'\n)\n  with (\n    'connector' = 'kafka', \n    'kafka.topic' = 'log.test'\n)\n", planner, this.getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        ResolvedCatalogTable catalogTable = op.getCatalogTable();
        Assertions.assertThat((Object[])catalogTable.getSchema().getFieldNames()).isEqualTo((Object)new String[]{"a", "b", "c", "d", "e", "f", "g"});
        Assertions.assertThat((Object[])catalogTable.getSchema().getFieldDataTypes()).isEqualTo((Object)new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING(), (DataType)DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.STRING()});
        Object[] columnExpressions = (String[])catalogTable.getSchema().getTableColumns().stream().filter(TableColumn.ComputedColumn.class::isInstance).map(TableColumn.ComputedColumn.class::cast).map(TableColumn.ComputedColumn::getExpression).toArray(String[]::new);
        String[] expected = new String[]{"`a` - 1", "`b` || '$$'", "`builtin`.`default`.`my_udf1`(`a`)", "`builtin`.`default`.`my_udf2`(`a`) + 1", "`builtin`.`default`.`my_udf3`(`a`) || '##'"};
        Assertions.assertThat((Object[])columnExpressions).isEqualTo((Object)expected);
    }

    @Test
    public void testCreateTableWithMetadataColumn() {
        String sql = "CREATE TABLE tbl1 (\n  a INT,\n  b STRING,\n  c INT METADATA,\n  d INT METADATA FROM 'other.key',\n  e INT METADATA VIRTUAL\n)\n  WITH (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        Operation operation = this.parse("CREATE TABLE tbl1 (\n  a INT,\n  b STRING,\n  c INT METADATA,\n  d INT METADATA FROM 'other.key',\n  e INT METADATA VIRTUAL\n)\n  WITH (\n    'connector' = 'kafka',\n    'kafka.topic' = 'log.test'\n)\n", planner, this.getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTableOperation.class);
        CreateTableOperation op = (CreateTableOperation)operation;
        TableSchema actualSchema = op.getCatalogTable().getSchema();
        TableSchema expectedSchema = TableSchema.builder().add((TableColumn)TableColumn.physical((String)"a", (DataType)DataTypes.INT())).add((TableColumn)TableColumn.physical((String)"b", (DataType)DataTypes.STRING())).add((TableColumn)TableColumn.metadata((String)"c", (DataType)DataTypes.INT())).add((TableColumn)TableColumn.metadata((String)"d", (DataType)DataTypes.INT(), (String)"other.key")).add((TableColumn)TableColumn.metadata((String)"e", (DataType)DataTypes.INT(), (boolean)true)).build();
        Assertions.assertThat((Object)actualSchema).isEqualTo((Object)expectedSchema);
    }

    @Test
    public void testCreateFunction() {
        String sql = "CREATE FUNCTION test_udf AS 'org.apache.fink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar'";
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        Operation operation = this.parse(sql, planner, this.getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat((Object)operation).isInstanceOf(CreateCatalogFunctionOperation.class);
        CatalogFunction actualFunction = ((CreateCatalogFunctionOperation)operation).getCatalogFunction();
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("CREATE CATALOG FUNCTION: (catalogFunction: [Optional[This is a user-defined function]], identifier: [`builtin`.`default`.`test_udf`], ignoreIfExists: [false], isTemporary: [false])");
        CatalogFunctionImpl expected = new CatalogFunctionImpl("org.apache.fink.function.function1", FunctionLanguage.JAVA, Collections.singletonList(new ResourceUri(ResourceType.JAR, "file:///path/to/test.jar")));
        Assertions.assertThat((Object)actualFunction).isEqualTo((Object)expected);
        sql = "CREATE TEMPORARY SYSTEM FUNCTION test_udf2 AS 'org.apache.fink.function.function2' LANGUAGE SCALA USING JAR 'file:///path/to/test.jar'";
        operation = this.parse(sql, planner, this.getParserBySqlDialect(SqlDialect.DEFAULT));
        Assertions.assertThat((Object)operation).isInstanceOf(CreateTempSystemFunctionOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("CREATE TEMPORARY SYSTEM FUNCTION: (functionName: [test_udf2], catalogFunction: [CatalogFunctionImpl{className='org.apache.fink.function.function2', functionLanguage='SCALA', functionResource='[ResourceUri{resourceType=JAR, uri='file:///path/to/test.jar'}]'}], ignoreIfExists: [false], functionLanguage: [SCALA])");
    }

    @Test
    public void testAlterTable() throws Exception {
        this.prepareTable(false);
        String[] renameTableSqls = new String[]{"alter table cat1.db1.tb1 rename to tb2", "alter table db1.tb1 rename to tb2", "alter table tb1 rename to cat1.db1.tb2"};
        ObjectIdentifier expectedIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb2");
        for (String renameTableSql : renameTableSqls) {
            Operation operation = this.parse(renameTableSql);
            Assertions.assertThat((Object)operation).isInstanceOf(AlterTableRenameOperation.class);
            AlterTableRenameOperation alterTableRenameOperation = (AlterTableRenameOperation)operation;
            Assertions.assertThat((Object)alterTableRenameOperation.getTableIdentifier()).isEqualTo((Object)expectedIdentifier);
            Assertions.assertThat((Object)alterTableRenameOperation.getNewTableIdentifier()).isEqualTo((Object)expectedNewIdentifier);
        }
        this.checkAlterNonExistTable("alter table %s nonexistent rename to tb2");
        this.checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 'K2' = 'V2')");
        Operation operation = this.parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
        HashMap<String, String> expectedOptions = new HashMap<String, String>();
        expectedOptions.put("connector", "dummy");
        expectedOptions.put("k", "v");
        expectedOptions.put("k1", "v1");
        expectedOptions.put("K2", "V2");
        this.assertAlterTableOptions(operation, expectedIdentifier, expectedOptions, Arrays.asList(TableChange.set((String)"k1", (String)"v1"), TableChange.set((String)"K2", (String)"V2")), "ALTER TABLE IF EXISTS cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 'K2' = 'V2'");
        this.checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
        operation = this.parse("alter table if exists cat1.db1.tb1 reset ('k')");
        this.assertAlterTableOptions(operation, expectedIdentifier, Collections.singletonMap("connector", "dummy"), Collections.singletonList(TableChange.reset((String)"k")), "ALTER TABLE IF EXISTS cat1.db1.tb1\n  RESET 'k'");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table cat1.db1.tb1 reset ('connector')")).isInstanceOf(ValidationException.class)).hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table cat1.db1.tb1 reset ()")).isInstanceOf(ValidationException.class)).hasMessageContaining("ALTER TABLE RESET does not support empty key");
    }

    @Test
    public void testAlterTableRenameColumn() throws Exception {
        this.prepareTable("tb1", false, true, 3);
        Operation operation = this.parse("alter table tb1 rename c to c1");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `c` TO `c1`");
        Assertions.assertThat((Object)((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema()).isEqualTo((Object)Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c1", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").watermark("ts", "ts - interval '5' seconds").primaryKeyNamed("ct1", new String[]{"a", "b", "c1"}).build());
        operation = this.parse("alter table tb1 rename f to f1");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `f` TO `f1`");
        Assertions.assertThat((Object)((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema()).isEqualTo((Object)Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f1", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").watermark("ts", "ts - interval '5' seconds").primaryKeyNamed("ct1", new String[]{"a", "b", "c"}).build());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 rename a to a1")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `a` is referenced by computed column `d`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 rename ts to ts1")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `ts` is referenced by watermark expression.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 rename e.f1 to e.f11")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f1 is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 rename c to a")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `a` already existed in table schema.");
        CatalogTable catalogTable2 = CatalogTable.newBuilder().schema(Schema.newBuilder().column("a", DataTypes.STRING().notNull()).column("b", DataTypes.INT().notNull()).column("e", (AbstractDataType)DataTypes.STRING()).columnByExpression("j", (Expression)Expressions.$((String)"e").upperCase()).columnByExpression("g", "TO_TIMESTAMP(e)").primaryKey(new String[]{"a", "b"}).build()).comment("tb2").partitionKeys(Collections.singletonList("a")).build();
        ((Catalog)this.catalogManager.getCatalog("cat1").get()).createTable(new ObjectPath("db1", "tb2"), (CatalogBaseTable)catalogTable2, true);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table `cat1`.`db1`.`tb2` rename e to e1")).isInstanceOf(ValidationException.class)).hasMessageContaining("Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `g`, `j`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 rename a to a1")).isInstanceOf(ValidationException.class)).hasMessageContaining("Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
        this.checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
        this.prepareTableWithDistribution("tb3");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 rename c to a1")).isInstanceOf(ValidationException.class)).hasMessageContaining("Failed to execute ALTER TABLE statement.\nThe column `c` is used as a distribution key.");
        this.checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
    }

    @Test
    public void testFailedToAlterTableDropColumn() throws Exception {
        this.prepareTable("tb1", false, true, 3);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop x")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `x` does not exist in the base table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop (g, x)")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `x` does not exist in the base table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop (g, c, g)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Duplicate column `g`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop e.f2")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f2 is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop a")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `a` is referenced by computed column `d`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop c")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `c` is used as the primary key.");
        this.prepareTableWithDistribution("tb3");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 drop c")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `c` is used as a distribution key.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop ts")).isInstanceOf(ValidationException.class)).hasMessageContaining("The column `ts` is referenced by watermark expression.");
        this.checkAlterNonExistTable("alter table %s nonexistent drop a");
    }

    @Test
    public void testAlterTableDropColumn() throws Exception {
        this.prepareTable(false);
        Operation operation = this.parse("alter table tb1 drop c");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP `c`");
        Assertions.assertThat(((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema().getColumns().stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList())).doesNotContain((Object[])new String[]{"c"});
        operation = this.parse("alter table tb1 drop (f, e, b, d)");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP `d`,\n  DROP `f`,\n  DROP `b`,\n  DROP `e`");
        Assertions.assertThat(((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema().getColumns().stream().map(Schema.UnresolvedColumn::getName).collect(Collectors.toList())).doesNotContain((Object[])new String[]{"f", "e", "b", "d"});
    }

    @Test
    public void testFailedToAlterTableDropConstraint() throws Exception {
        this.prepareTable("tb1", 0);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop primary key")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any primary key.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop constraint ct")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any primary key.");
        this.prepareTable("tb2", 1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 drop constraint ct2")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
        this.checkAlterNonExistTable("alter table %s nonexistent drop primary key");
        this.checkAlterNonExistTable("alter table %s nonexistent drop constraint ct");
    }

    @Test
    public void testAlterTableDropConstraint() throws Exception {
        this.prepareTable(true);
        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP CONSTRAINT ct1";
        Operation operation = this.parse("alter table tb1 drop constraint ct1");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo(expectedSummaryString);
        Assertions.assertThat((Optional)((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema().getPrimaryKey()).isNotPresent();
    }

    @Test
    public void testAlterTableDropDistribution() throws Exception {
        this.prepareTableWithDistribution("tb1");
        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP DISTRIBUTION";
        Operation operation = this.parse("alter table tb1 drop distribution");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo(expectedSummaryString);
        Assertions.assertThat((Optional)((AlterTableChangeOperation)operation).getNewTable().getDistribution()).isNotPresent();
        this.checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
    }

    @Test
    public void testFailedToAlterTableDropDistribution() throws Exception {
        this.prepareTable("tb1", false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop distribution")).isInstanceOf(ValidationException.class)).hasMessageContaining("Table `cat1`.`db1`.`tb1` does not have a distribution to drop.");
        this.checkAlterNonExistTable("alter table %s nonexistent drop watermark");
    }

    @Test
    public void testFailedToAlterTableDropWatermark() throws Exception {
        this.prepareTable("tb1", false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 drop watermark")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any watermark strategy.");
        this.checkAlterNonExistTable("alter table %s nonexistent drop watermark");
    }

    @Test
    public void testAlterTableDropWatermark() throws Exception {
        this.prepareTable("tb1", true);
        Operation operation = this.parse("alter table tb1 drop watermark");
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP WATERMARK");
        Assertions.assertThat((List)((AlterTableChangeOperation)operation).getNewTable().getUnresolvedSchema().getWatermarkSpecs()).isEmpty();
    }

    @Test
    public void testFailedToAlterTableAddColumn() throws Exception {
        this.prepareTable("tb1", 0);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add a bigint")).isInstanceOf(ValidationException.class)).hasMessageContaining("Try to add a column `a` which already exists in the table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (x array<string>, x string)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Encounter duplicate column `x`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add x bigint after y")).isInstanceOf(ValidationException.class)).hasMessageContaining("Referenced column `y` by 'AFTER' does not exist in the table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (x bigint after y, y string first)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Referenced column `y` by 'AFTER' does not exist in the table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add m as n + 2")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'm'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (m as b * 2, n as m + 2)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'n'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (m as 'hello' || b)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'm'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (e.f3 string)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (x string after e.f2)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (e.f3 string after e.f1)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
        this.checkAlterNonExistTable("alter table %s nonexistent add a bigint not null");
    }

    @Test
    public void testAlterTableAddColumn() throws Exception {
        this.prepareTable("tb1", 0);
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Schema originalSchema = ((ContextResolvedTable)this.catalogManager.getTable(tableIdentifier).get()).getTable().getUnresolvedSchema();
        Operation operation = this.parse("alter table if exists tb1 add h double not null comment 'h is double not null'");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE IF EXISTS cat1.db1.tb1\n  ADD `h` DOUBLE NOT NULL COMMENT 'h is double not null' ");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("h", DataTypes.DOUBLE().notNull()).withComment("h is double not null").build());
        operation = this.parse("alter table tb1 add (\n h as e.f2.f1 first,\n i as b*2 after b,\n j int metadata from 'mk1' virtual comment 'comment_metadata' first,\n k string primary key not enforced after h)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `h` ARRAY<FLOAT> AS `e`.`f2`.`f1` FIRST,\n  ADD `i` BIGINT NOT NULL AS `b` * 2 AFTER `b`,\n  ADD `j` INT METADATA FROM 'mk1' VIRTUAL COMMENT 'comment_metadata' FIRST,\n  ADD `k` STRING NOT NULL AFTER `h`,\n  ADD CONSTRAINT `PK_k` PRIMARY KEY (`k`) NOT ENFORCED");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().columnByMetadata("j", (AbstractDataType)DataTypes.INT(), "mk1", true).withComment("comment_metadata").columnByExpression("h", "`e`.`f2`.`f1`").column("k", DataTypes.STRING().notNull()).column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).columnByExpression("i", (Expression)new SqlCallExpression("`b` * 2")).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").primaryKey(new String[]{"k"}).build());
        operation = this.parse("alter table tb1 add (\n r row<r1 bigint, r2 string, r3 array<double> not null> not null comment 'add composite type',\n m map<string not null, int not null>,\n n as r.r1 * 2 after r,\n tss as to_timestamp(r.r2) comment 'rowtime' after ts,\n na as r.r3 after ts)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `r` ROW<`r1` BIGINT, `r2` STRING, `r3` ARRAY<DOUBLE> NOT NULL> NOT NULL COMMENT 'add composite type' ,\n  ADD `m` MAP<STRING NOT NULL, INT NOT NULL> ,\n  ADD `n` BIGINT AS `r`.`r1` * 2 AFTER `r`,\n  ADD `tss` TIMESTAMP(3) AS `to_timestamp`(`r`.`r2`) COMMENT 'rowtime' AFTER `ts`,\n  ADD `na` ARRAY<DOUBLE> NOT NULL AS `r`.`r3` AFTER `ts`");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).columnByExpression("na", "`r`.`r3`").columnByExpression("tss", "`to_timestamp`(`r`.`r2`)").withComment("rowtime").column("r", DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"r1", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"r2", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"r3", (DataType)((DataType)DataTypes.ARRAY((DataType)DataTypes.DOUBLE()).notNull()))}).notNull()).withComment("add composite type").columnByExpression("n", "`r`.`r1` * 2").column("m", (AbstractDataType)DataTypes.MAP((DataType)((DataType)DataTypes.STRING().notNull()), (DataType)((DataType)DataTypes.INT().notNull()))).build());
    }

    @Test
    public void testFailedToAlterTableAddPk() throws Exception {
        this.prepareTable("tb1", 1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add primary key(c) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table has already defined the primary key constraint [`a`]. You might want to drop it before adding a new one.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add x string not null primary key not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table has already defined the primary key constraint [`a`]. You might want to drop it before adding a new one");
        this.prepareTable("tb2", 2);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 add primary key(c) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table has already defined the primary key constraint [`a`, `b`]. You might want to drop it before adding a new one");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 add x string not null primary key not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table has already defined the primary key constraint [`a`, `b`]. You might want to drop it before adding a new one");
        this.prepareTable("tb3", 0);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 add primary key (x) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'PK_x'. Column 'x' does not exist.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 add unique(b)")).isInstanceOf(SqlValidateException.class)).hasMessageContaining("UNIQUE constraint is not supported yet");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 add primary key(b)")).isInstanceOf(SqlValidateException.class)).hasMessageContaining("Flink doesn't support ENFORCED mode for PRIMARY KEY constraint");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 add (\n  x as upper(c),\n  primary key (d, x) not enforced)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb3 add (primary key (g) not enforced)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
        this.checkAlterNonExistTable("alter table %s nonexistent add primary key(x) not enforced");
    }

    @Test
    public void testAlterTableAddPrimaryKey() throws Exception {
        this.prepareTable("tb1", 0);
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Schema originalSchema = ((ContextResolvedTable)this.catalogManager.getTable(tableIdentifier).get()).getTable().getUnresolvedSchema();
        Operation operation = this.parse("alter table tb1 add constraint my_pk primary key (a, b) not enforced");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD CONSTRAINT `my_pk` PRIMARY KEY (`a`, `b`) NOT ENFORCED");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).primaryKeyNamed("my_pk", new String[]{"a", "b"}).build());
        operation = this.parse("alter table tb1 add x bigint not null primary key not enforced");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` BIGINT NOT NULL ,\n  ADD CONSTRAINT `PK_x` PRIMARY KEY (`x`) NOT ENFORCED");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("x", DataTypes.BIGINT().notNull()).primaryKey(new String[]{"x"}).build());
        operation = this.parse("alter table tb1 add x bigint primary key not enforced");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` BIGINT NOT NULL ,\n  ADD CONSTRAINT `PK_x` PRIMARY KEY (`x`) NOT ENFORCED");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("x", DataTypes.BIGINT().notNull()).primaryKey(new String[]{"x"}).build());
        operation = this.parse("alter table tb1 add constraint ct primary key(ts) not enforced");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD CONSTRAINT `ct` PRIMARY KEY (`ts`) NOT ENFORCED");
        List subColumns = originalSchema.getColumns().subList(0, originalSchema.getColumns().size() - 1);
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromColumns(subColumns).column("ts", DataTypes.TIMESTAMP((int)3).notNull()).withComment("just a comment").primaryKeyNamed("ct", new String[]{"ts"}).build());
    }

    @Test
    public void testFailedToAlterTableAddWatermark() throws Exception {
        this.prepareTable("tb1", false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add watermark for x as x")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid column name 'x' for rowtime attribute in watermark declaration. Available columns are: [a, b, c, d, e, f, g, ts]");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add watermark for b as b")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BIGINT NOT NULL");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add (x row<f0 string, f1 timestamp(3)>, watermark for x.f1 as x.f1)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Watermark strategy on nested column is not supported yet.");
        this.prepareTable("tb2", true);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 add watermark for ts as ts")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table has already defined the watermark strategy `ts` AS ts - interval '5' seconds. You might want to drop it before adding a new one.");
        this.checkAlterNonExistTable("alter table %s nonexistent add watermark for ts as ts");
    }

    @Test
    public void testAlterTableAddWatermark() throws Exception {
        this.prepareTable("tb1", false);
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Schema originalSchema = ((ContextResolvedTable)this.catalogManager.getTable(tableIdentifier).get()).getTable().getUnresolvedSchema();
        Operation operation = this.parse("alter table tb1 add watermark for ts as ts");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD WATERMARK FOR `ts`: TIMESTAMP(3) AS `ts`");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).watermark("ts", "`ts`").build());
        operation = this.parse("alter table tb1 add (tss timestamp(3) not null, watermark for tss as tss)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `tss` TIMESTAMP(3) NOT NULL ,\n  ADD WATERMARK FOR `tss`: TIMESTAMP(3) NOT NULL AS `tss`");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("tss", DataTypes.TIMESTAMP((int)3).notNull()).watermark("tss", "`tss`").build());
        operation = this.parse("alter table tb1 add (log_ts string not null,\ntss as to_timestamp(log_ts),\nwatermark for tss as tss - interval '3' second)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `log_ts` STRING NOT NULL ,\n  ADD `tss` TIMESTAMP(3) AS `to_timestamp`(`log_ts`) ,\n  ADD WATERMARK FOR `tss`: TIMESTAMP(3) AS `tss` - INTERVAL '3' SECOND");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("log_ts", DataTypes.STRING().notNull()).columnByExpression("tss", "`to_timestamp`(`log_ts`)").watermark("tss", "`tss` - INTERVAL '3' SECOND").build());
        operation = this.parse("alter table tb1 add (x row<f0 string, f1 timestamp(3) not null> not null, y as x.f1, watermark for y as y - interval '1' day)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  ADD `x` ROW<`f0` STRING, `f1` TIMESTAMP(3) NOT NULL> NOT NULL ,\n  ADD `y` TIMESTAMP(3) NOT NULL AS `x`.`f1` ,\n  ADD WATERMARK FOR `y`: TIMESTAMP(3) NOT NULL AS `y` - INTERVAL '1' DAY");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromSchema(originalSchema).column("x", DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), (DataType)DataTypes.TIMESTAMP((int)3).notNull()}).notNull()).columnByExpression("y", "`x`.`f1`").watermark("y", "`y` - INTERVAL '1' DAY").build());
    }

    @Test
    public void testFailedToAlterTableModifyColumn() throws Exception {
        this.prepareTable("tb1", true);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify (b int, b array<int not null>)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Encounter duplicate column `b`.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify x bigint")).isInstanceOf(ValidationException.class)).hasMessageContaining("Try to modify a column `x` which does not exist in the table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify a bigint after x")).isInstanceOf(ValidationException.class)).hasMessageContaining("Referenced column `x` by 'AFTER' does not exist in the table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify e array<int>")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'f'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify a string")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'd'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify b as a + 2")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'd'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify (a timestamp(3), b multiset<int>)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid expression for computed column 'd'.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify ts int")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is INT");
        this.prepareTable("tb2", 1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify (d int, a as b + 2)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'ct1'. Column 'a' is not a physical column.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify (d string, a int metadata virtual)")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'ct1'. Column 'a' is not a physical column.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify (e.f0 string)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify (g string after e.f2)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type is not supported yet.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify (e.f0 string after e.f1)")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
        this.checkAlterNonExistTable("alter table %s nonexistent modify a int first");
    }

    @Test
    public void testAlterTableModifyColumn() throws Exception {
        this.prepareTable("tb1", 2);
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Operation operation = this.parse("alter table tb1 modify b bigint not null comment 'move b to first and add comment' first");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `b` COMMENT 'move b to first and add comment',\n  MODIFY `b` FIRST");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().column("b", DataTypes.BIGINT().notNull()).withComment("move b to first and add comment").column("a", DataTypes.INT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").primaryKeyNamed("ct1", new String[]{"a", "b"}).build());
        operation = this.parse("alter table tb1 modify ts timestamp(3) not null after e");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `ts` TIMESTAMP(3) NOT NULL,\n  MODIFY `ts` AFTER `e`");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).column("ts", DataTypes.TIMESTAMP((int)3).notNull()).withComment("just a comment").columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).primaryKeyNamed("ct1", new String[]{"a", "b"}).build());
        operation = this.parse("alter table tb1 modify (\nd as a + 2 comment 'change d' after b,\nc bigint first,\ne string comment 'change e',\nf as upper(e) comment 'change f' after ts,\ng int not null comment 'change g',\nconstraint ct2 primary key(e) not enforced)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n  MODIFY `d` INT NOT NULL AS `a` + 2 COMMENT 'change d' AFTER `b`,\n  MODIFY `c` BIGINT,\n  MODIFY `c` FIRST,\n  MODIFY `e` COMMENT 'change e',\n  MODIFY `e` STRING NOT NULL,\n  MODIFY `f` STRING NOT NULL AS UPPER(`e`) COMMENT 'change f' AFTER `ts`,\n  MODIFY `g` INT NOT NULL COMMENT 'change g' ,\n  MODIFY CONSTRAINT `ct2` PRIMARY KEY (`e`) NOT ENFORCED");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().column("c", (AbstractDataType)DataTypes.BIGINT()).withComment("column comment").column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).columnByExpression("d", "`a` + 2").withComment("change d").column("e", DataTypes.STRING().notNull()).withComment("change e").column("g", DataTypes.INT().notNull()).withComment("change g").column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").columnByExpression("f", "UPPER(`e`)").withComment("change f").primaryKeyNamed("ct2", new String[]{"e"}).build());
        this.prepareTable("tb2", true);
        tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb2");
        operation = this.parse("alter table tb2 modify (ts int comment 'change ts',\nf timestamp(3) not null,\ne int metadata virtual,\nwatermark for f as f,\ng multiset<int> not null comment 'change g' first)");
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb2\n  MODIFY `ts` COMMENT 'change ts',\n  MODIFY `ts` INT,\n  MODIFY `f` TIMESTAMP(3) NOT NULL ,\n  MODIFY `e` INT METADATA VIRTUAL ,\n  MODIFY `g` MULTISET<INT> NOT NULL COMMENT 'change g' FIRST,\n  MODIFY WATERMARK FOR `f`: TIMESTAMP(3) NOT NULL AS `f`");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().column("g", DataTypes.MULTISET((DataType)DataTypes.INT()).notNull()).withComment("change g").column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").columnByMetadata("e", (AbstractDataType)DataTypes.INT(), null, true).column("f", DataTypes.TIMESTAMP((int)3).notNull()).column("ts", (AbstractDataType)DataTypes.INT()).withComment("change ts").watermark("f", "`f`").build());
    }

    @Test
    public void testFailedToAlterTableModifyPk() throws Exception {
        this.prepareTable("tb1", 0);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify constraint ct primary key (b) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any primary key constraint. You might want to add a new one.");
        this.prepareTable("tb2", 1);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify constraint ct primary key (x) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'ct'. Column 'x' does not exist.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify constraint ct primary key (d) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'ct'. Column 'd' is not a physical column.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify constraint ct primary key (g) not enforced")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid primary key 'ct'. Column 'g' is not a physical column.");
        this.checkAlterNonExistTable("alter table %s nonexistent modify constraint ct primary key(a) not enforced");
    }

    @Test
    public void testAlterTableModifyPk() throws Exception {
        this.prepareTable("tb1", 1);
        Operation operation = this.parse("alter table tb1 modify constraint ct2 primary key (a, b) not enforced");
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Schema originalSchema = ((ContextResolvedTable)this.catalogManager.getTable(tableIdentifier).get()).getTable().getUnresolvedSchema();
        this.assertAlterTableSchema(operation, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1"), Schema.newBuilder().fromColumns(originalSchema.getColumns()).primaryKeyNamed("ct2", new String[]{"a", "b"}).build());
        operation = this.parse("alter table tb1 modify primary key (c, a) not enforced");
        this.assertAlterTableSchema(operation, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1"), Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").primaryKeyNamed("PK_c_a", new String[]{"c", "a"}).build());
    }

    @Test
    public void testAlterTableAddDistribution() throws Exception {
        this.prepareTable("tb1", false);
        Operation operation = this.parse("alter table tb1 add distribution by hash(a) into 12 buckets");
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        this.assertAlterTableDistribution(operation, tableIdentifier, TableDistribution.ofHash(Collections.singletonList("a"), (Integer)12), "ALTER TABLE cat1.db1.tb1\n  ADD DISTRIBUTED BY HASH(`a`) INTO 12 BUCKETS\n");
    }

    @Test
    public void testFailedToAlterTableAddDistribution() throws Exception {
        this.prepareTableWithDistribution("tb1");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 add distribution by hash(a) into 12 buckets")).isInstanceOf(ValidationException.class)).hasMessageContaining("You can modify it or drop it before adding a new one.");
    }

    @Test
    public void testFailedToAlterTableModifyDistribution() throws Exception {
        this.prepareTable("tb2", false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify distribution by hash(a) into 12 buckets")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any distribution. You might want to add a new one.");
    }

    @Test
    public void testAlterTableModifyDistribution() throws Exception {
        this.prepareTableWithDistribution("tb1");
        Operation operation = this.parse("alter table tb1 modify distribution by hash(c) into 12 buckets");
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        this.assertAlterTableDistribution(operation, tableIdentifier, TableDistribution.ofHash(Collections.singletonList("c"), (Integer)12), "ALTER TABLE cat1.db1.tb1\n  MODIFY DISTRIBUTED BY HASH(`c`) INTO 12 BUCKETS\n");
    }

    @Test
    public void testFailedToAlterTableModifyWatermark() throws Exception {
        this.prepareTable("tb1", false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb1 modify watermark for a as to_timestamp(a) - interval '1' minute")).isInstanceOf(ValidationException.class)).hasMessageContaining("The base table does not define any watermark. You might want to add a new one.");
        this.prepareTable("tb2", true);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify watermark for a as a")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is INT NOT NULL");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse("alter table tb2 modify watermark for c as to_timestamp(c) - interval '1' day")).isInstanceOf(ValidationException.class)).hasMessageContaining("Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is STRING");
        this.checkAlterNonExistTable("alter table %s nonexistent modify watermark for ts as ts");
    }

    @Test
    public void testAlterTableModifyWatermark() throws Exception {
        this.prepareTable("tb1", true);
        Operation operation = this.parse("alter table tb1 modify watermark for ts as ts");
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tb1");
        Schema originalSchema = ((ContextResolvedTable)this.catalogManager.getTable(tableIdentifier).get()).getTable().getUnresolvedSchema();
        List columns = originalSchema.getColumns();
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromColumns(columns).watermark("ts", "`ts`").build());
        operation = this.parse("alter table tb1 modify (g timestamp(3) not null, watermark for g as g)");
        this.assertAlterTableSchema(operation, tableIdentifier, Schema.newBuilder().fromColumns(columns.subList(0, columns.size() - 2)).column("g", DataTypes.TIMESTAMP((int)3).notNull()).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment").watermark("g", "`g`").build());
    }

    @Test
    public void testCreateViewWithMatchRecognize() {
        HashMap<String, String> prop = new HashMap<String, String>();
        prop.put("connector", "values");
        prop.put("bounded", "true");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("id", DataTypes.INT().notNull()).column("measurement", DataTypes.BIGINT().notNull()).column("ts", (AbstractDataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"tmstmp", (DataType)DataTypes.TIMESTAMP((int)3))})).build()).options(prop).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"events"), false);
        String sql = "CREATE TEMPORARY VIEW foo AS SELECT * FROM events MATCH_RECOGNIZE (    PARTITION BY id     ORDER BY ts ASC     MEASURES       next_step.measurement - this_step.measurement AS diff     AFTER MATCH SKIP TO NEXT ROW     PATTERN (this_step next_step)    DEFINE          this_step AS TRUE,         next_step AS TRUE)";
        Operation operation = this.parse("CREATE TEMPORARY VIEW foo AS SELECT * FROM events MATCH_RECOGNIZE (    PARTITION BY id     ORDER BY ts ASC     MEASURES       next_step.measurement - this_step.measurement AS diff     AFTER MATCH SKIP TO NEXT ROW     PATTERN (this_step next_step)    DEFINE          this_step AS TRUE,         next_step AS TRUE)");
        Assertions.assertThat((Object)operation).isInstanceOf(CreateViewOperation.class);
    }

    @Test
    public void testCreateViewWithDynamicTableOptions() {
        HashMap<String, String> prop = new HashMap<String, String>();
        prop.put("connector", "values");
        prop.put("bounded", "true");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("f0", (AbstractDataType)DataTypes.INT()).column("f1", (AbstractDataType)DataTypes.VARCHAR((int)20)).build()).options(prop).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"sourceA"), false);
        String sql = "create view test_view as\nselect *\nfrom sourceA /*+ OPTIONS('changelog-mode'='I') */";
        Operation operation = this.parse("create view test_view as\nselect *\nfrom sourceA /*+ OPTIONS('changelog-mode'='I') */");
        Assertions.assertThat((Object)operation).isInstanceOf(CreateViewOperation.class);
    }

    @Test
    public void testAlterTableAddPartitions() throws Exception {
        this.prepareTable("tb1", true, true, 0);
        Operation operation = this.parse("alter table tb1 add partition (b = '1', c = '2')");
        Assertions.assertThat((Object)operation).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD PARTITION (b=1, c=2)");
        operation = this.parse("alter table tb1 add partition (b = '1', c = '2') with ('k' = 'v')");
        Assertions.assertThat((Object)operation).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD PARTITION (b=1, c=2) WITH (k: [v])");
        operation = this.parse("alter table tb1 add if not exists partition (b = '1', c = '2') with ('k' = 'v') partition (b = '2')");
        Assertions.assertThat((Object)operation).isInstanceOf(AddPartitionsOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 ADD IF NOT EXISTS PARTITION (b=1, c=2) WITH (k: [v]) PARTITION (b=2)");
    }

    @Test
    public void testAlterTableDropPartitions() throws Exception {
        this.prepareTable("tb1", true, true, 0);
        Operation operation = this.parse("alter table tb1 drop partition (b = '1', c = '2')");
        Assertions.assertThat((Object)operation).isInstanceOf(DropPartitionsOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 DROP PARTITION (b=1, c=2)");
        operation = this.parse("alter table tb1 drop if exists partition (b = '1', c = '2'), partition (b = '2')");
        Assertions.assertThat((Object)operation).isInstanceOf(DropPartitionsOperation.class);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1 DROP IF EXISTS PARTITION (b=1, c=2) PARTITION (b=2)");
    }

    @Test
    public void testCreateViewWithDuplicateFieldName() {
        HashMap<String, String> prop = new HashMap<String, String>();
        prop.put("connector", "values");
        prop.put("bounded", "true");
        CatalogTable catalogTable = CatalogTable.newBuilder().schema(Schema.newBuilder().column("id", DataTypes.BIGINT().notNull()).column("uid", DataTypes.BIGINT().notNull()).build()).options(prop).build();
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, ObjectIdentifier.of((String)"builtin", (String)"default", (String)"id_table"), false);
        Operation operation = this.parse("CREATE VIEW id_view(a, b) AS SELECT id, uid AS id FROM id_table");
        Assertions.assertThat((Object)operation).isInstanceOf(CreateViewOperation.class);
        Assertions.assertThatThrownBy(() -> this.parse("CREATE VIEW id_view(a, a) AS SELECT id, uid AS id FROM id_table")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, (String)"A column with the same name `a` has been defined at line 1, column 37.")});
        Assertions.assertThatThrownBy(() -> this.parse("CREATE VIEW id_view AS\nSELECT id, uid AS id FROM id_table")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, (String)"A column with the same name `id` has been defined at line 2, column 8.")});
        Assertions.assertThatThrownBy(() -> this.parse("CREATE VIEW union_view AS\n  SELECT id, uid AS id FROM id_table\n  UNION\n  SELECT uid, id AS uid FROM id_table")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, (String)"A column with the same name `id` has been defined at line 2, column 10.")});
        Assertions.assertThatThrownBy(() -> this.parse("CREATE VIEW cte_view AS\nWITH id_num AS (\n  select id from id_table\n)\nSELECT id, uid as id\nFROM id_table\n")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlValidateException.class, (String)"A column with the same name `id` has been defined at line 5, column 8.")});
    }

    private static TestItem createTestItem(Object ... args) {
        Assertions.assertThat((Object[])args).hasSize(2);
        String testExpr = (String)args[0];
        TestItem testItem = TestItem.fromTestExpr(testExpr);
        if (args[1] instanceof String) {
            testItem.withExpectedError((String)args[1]);
        } else {
            testItem.withExpectedType(args[1]);
        }
        return testItem;
    }

    private void prepareTable(boolean hasConstraint) throws Exception {
        this.prepareTable("tb1", hasConstraint ? 1 : 0);
    }

    private void prepareTable(String tableName, int numOfPkFields) throws Exception {
        this.prepareTable(tableName, false, false, numOfPkFields);
    }

    private void prepareTable(String tableName, boolean hasWatermark) throws Exception {
        this.prepareTable(tableName, false, hasWatermark, 0);
    }

    private void prepareTableWithDistribution(String tableName) throws Exception {
        TableDistribution distribution = TableDistribution.of((TableDistribution.Kind)TableDistribution.Kind.HASH, (Integer)6, Collections.singletonList("c"));
        this.prepareTable(tableName, false, false, 0, distribution);
    }

    private void prepareTable(String tableName, boolean hasPartition, boolean hasWatermark, int numOfPkFields) throws Exception {
        this.prepareTable(tableName, hasPartition, hasWatermark, numOfPkFields, null);
    }

    private void prepareTable(String tableName, boolean hasPartition, boolean hasWatermark, int numOfPkFields, @Nullable TableDistribution tableDistribution) throws Exception {
        GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("default", "default");
        if (!this.catalogManager.getCatalog("cat1").isPresent()) {
            this.catalogManager.registerCatalog("cat1", (Catalog)catalog);
        }
        this.catalogManager.createDatabase("cat1", "db1", (CatalogDatabase)new CatalogDatabaseImpl(new HashMap(), null), true);
        Schema.Builder builder = Schema.newBuilder().column("a", DataTypes.INT().notNull()).column("b", DataTypes.BIGINT().notNull()).column("c", DataTypes.STRING().notNull()).withComment("column comment").columnByExpression("d", "a*(b+2 + a*b)").column("e", (AbstractDataType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.STRING(), DataTypes.INT(), DataTypes.ROW((DataType[])new DataType[]{DataTypes.DOUBLE(), DataTypes.ARRAY((DataType)DataTypes.FLOAT())})})).columnByExpression("f", "e.f1 + e.f2.f0").columnByMetadata("g", (AbstractDataType)DataTypes.STRING(), null, true).column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).withComment("just a comment");
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("k", "v");
        options.put("connector", "dummy");
        if (numOfPkFields != 0) {
            if (numOfPkFields == 1) {
                builder.primaryKeyNamed("ct1", new String[]{"a"});
            } else if (numOfPkFields == 2) {
                builder.primaryKeyNamed("ct1", new String[]{"a", "b"});
            } else if (numOfPkFields == 3) {
                builder.primaryKeyNamed("ct1", new String[]{"a", "b", "c"});
            } else {
                throw new IllegalArgumentException(String.format("Don't support to set pk with %s fields.", numOfPkFields));
            }
        }
        if (hasWatermark) {
            builder.watermark("ts", "ts - interval '5' seconds");
        }
        CatalogTable.Builder tableBuilder = CatalogTable.newBuilder().schema(builder.build()).comment("a table").partitionKeys(hasPartition ? Arrays.asList("b", "c") : Collections.emptyList()).options(Collections.unmodifiableMap(options));
        if (tableDistribution != null) {
            tableBuilder.distribution(tableDistribution);
        }
        CatalogTable catalogTable = tableBuilder.build();
        this.catalogManager.setCurrentCatalog("cat1");
        this.catalogManager.setCurrentDatabase("db1");
        ObjectIdentifier tableIdentifier = ObjectIdentifier.of((String)"cat1", (String)"db1", (String)tableName);
        this.catalogManager.createTable((CatalogBaseTable)catalogTable, tableIdentifier, true);
    }

    private void assertAlterTableOptions(Operation operation, ObjectIdentifier expectedIdentifier, Map<String, String> expectedOptions, List<TableChange> expectedChanges, String expectedSummary) {
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        AlterTableChangeOperation alterTableOptionsOperation = (AlterTableChangeOperation)operation;
        Assertions.assertThat((Object)alterTableOptionsOperation.getTableIdentifier()).isEqualTo((Object)expectedIdentifier);
        Assertions.assertThat((Map)alterTableOptionsOperation.getNewTable().getOptions()).isEqualTo(expectedOptions);
        Assertions.assertThat(expectedChanges).isEqualTo((Object)alterTableOptionsOperation.getTableChanges());
        Assertions.assertThat((String)alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
    }

    private void assertAlterTableSchema(Operation operation, ObjectIdentifier expectedIdentifier, Schema expectedSchema) {
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        AlterTableChangeOperation alterTableChangeOperation = (AlterTableChangeOperation)operation;
        Assertions.assertThat((Object)alterTableChangeOperation.getTableIdentifier()).isEqualTo((Object)expectedIdentifier);
        Assertions.assertThat((Object)alterTableChangeOperation.getNewTable().getUnresolvedSchema()).isEqualTo((Object)expectedSchema);
    }

    private void assertAlterTableDistribution(Operation operation, ObjectIdentifier expectedIdentifier, TableDistribution distribution, String expectedSummaryString) {
        Assertions.assertThat((Object)operation).isInstanceOf(AlterTableChangeOperation.class);
        AlterTableChangeOperation alterTableChangeOperation = (AlterTableChangeOperation)operation;
        Assertions.assertThat((Object)alterTableChangeOperation.getTableIdentifier()).isEqualTo((Object)expectedIdentifier);
        Assertions.assertThat((Optional)alterTableChangeOperation.getNewTable().getDistribution()).contains((Object)distribution);
        Assertions.assertThat((String)operation.asSummaryString()).isEqualTo(expectedSummaryString);
    }

    private void checkAlterNonExistTable(String sqlTemplate) {
        Assertions.assertThat((Object)this.parse(String.format(sqlTemplate, "if exists "))).isInstanceOf(NopOperation.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.parse(String.format(sqlTemplate, ""))).isInstanceOf(ValidationException.class)).hasMessageContaining("Table `cat1`.`db1`.`nonexistent` doesn't exist or is a temporary table.");
    }

    private Operation parseAndConvert(String sql) {
        FlinkPlannerImpl planner = this.getPlannerBySqlDialect(SqlDialect.DEFAULT);
        CalciteParser parser = this.getParserBySqlDialect(SqlDialect.DEFAULT);
        SqlNode node = parser.parse(sql);
        return (Operation)SqlNodeToOperationConversion.convert((FlinkPlannerImpl)planner, (CatalogManager)this.catalogManager, (SqlNode)node).get();
    }

    private static class TestItem {
        private final String testExpr;
        @Nullable
        private Object expectedType;
        @Nullable
        private String expectedError;

        private TestItem(String testExpr) {
            this.testExpr = testExpr;
        }

        static TestItem fromTestExpr(String testExpr) {
            return new TestItem(testExpr);
        }

        TestItem withExpectedType(Object expectedType) {
            this.expectedType = expectedType;
            return this;
        }

        TestItem withExpectedError(String expectedError) {
            this.expectedError = expectedError;
            return this;
        }

        public String toString() {
            return this.testExpr;
        }
    }
}

