/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.connector.file.table.TestCustomCommitPolicy;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class FileSystemTableSinkTest {
    FileSystemTableSinkTest() {
    }

    @Test
    void testExceptionWhenSettingParallelismWithUpdatingQuery() {
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String testSourceTableName = "test_source_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", false));
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 10, false));
        String sql = String.format("INSERT INTO %s SELECT DISTINCT * FROM %s", "test_sink_table", "test_source_table");
        CommonTestUtils.assertThrows((String)"filesystem sink doesn't support setting parallelism (10) by 'sink.parallelism' when the input stream is not INSERT only.", ValidationException.class, () -> tEnv.explainSql(sql, new ExplainDetail[0]));
    }

    @Test
    void testFileSystemTableSinkWithParallelismInStreaming() {
        int parallelism = 5;
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)8);
        String testSourceTableName = "test_source_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", false));
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 5, false));
        String sql0 = FileSystemTableSinkTest.buildInsertIntoSql("test_sink_table", "test_source_table");
        String actualNormal = tEnv.explainSql(sql0, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expectedNormal = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql0.out");
        Assertions.assertThat((String)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actualNormal)))).isEqualTo(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expectedNormal))));
        String testCompactSinkTableName = "test_compact_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_compact_sink_table", 5, true));
        String sql1 = FileSystemTableSinkTest.buildInsertIntoSql("test_compact_sink_table", "test_source_table");
        String actualCompact = tEnv.explainSql(sql1, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expectedCompact = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql1.out");
        Assertions.assertThat((String)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actualCompact)))).isEqualTo(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expectedCompact))));
    }

    @Test
    void testFileSystemTableSinkWithParallelismInBatch() {
        int parallelism = 5;
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)8);
        String testSourceTableName = "test_source_table";
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", true));
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 5, false));
        String sql = FileSystemTableSinkTest.buildInsertIntoSql("test_sink_table", "test_source_table");
        String actual = tEnv.explainSql(sql, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expected = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInBatch.out");
        Assertions.assertThat((String)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actual)))).isEqualTo(TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expected))));
    }

    private static String buildSourceTableSql(String testSourceTableName, boolean bounded) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'values', 'bounded' = '%s')", testSourceTableName, bounded);
    }

    private static String buildSinkTableSql(String tableName, int parallelism, boolean autoCompaction) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'filesystem', 'path' = '/tmp', 'auto-compaction' = '%s', 'format' = 'testcsv', 'sink.parallelism' = '%s')", tableName, autoCompaction, parallelism);
    }

    private static String buildInsertIntoSql(String sinkTable, String sourceTable) {
        return String.format("INSERT INTO %s SELECT * FROM %s", sinkTable, sourceTable);
    }

    @Test
    void testFileSystemTableSinkWithCustomCommitPolicy() throws Exception {
        String outputTable = "outputTable";
        String customPartitionCommitPolicyClassName = TestCustomCommitPolicy.class.getName();
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String ddl = "CREATE TABLE %s (  a INT,  b STRING,  d STRING,  e STRING) PARTITIONED BY (d, e) WITH ('connector'='filesystem','path'='/tmp','format'='testcsv','sink.partition-commit.delay'='0s','sink.partition-commit.policy.kind'='custom','sink.partition-commit.policy.class'='%s','sink.partition-commit.policy.class.parameters'='test1;test2')";
        ddl = String.format(ddl, "outputTable", customPartitionCommitPolicyClassName);
        tEnv.executeSql(ddl);
        tEnv.executeSql("insert into outputTable select * from (values (1, 'a', '2020-05-03', '3'), (2, 'x', '2020-05-03', '4'))").await();
        Set<String> actualCommittedPaths = TestCustomCommitPolicy.getCommittedPartitionPathsAndReset();
        HashSet<String> expectedCommittedPaths = new HashSet<String>(Arrays.asList("test1test2", "/tmp/d=2020-05-03/e=3", "/tmp/d=2020-05-03/e=4"));
        Assertions.assertThat(actualCommittedPaths).isEqualTo(expectedCommittedPaths);
    }
}

