/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.File;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.catalog.StagedTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.FileUtils;

public class TestSupportsStagingTableFactory
implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "test-staging";
    public static final List<String> JOB_STATUS_CHANGE_PROCESS = new LinkedList<String>();
    public static final List<SupportsStaging.StagingPurpose> STAGING_PURPOSE_LIST = new LinkedList<SupportsStaging.StagingPurpose>();
    private static final ConfigOption<String> DATA_DIR = ConfigOptions.key((String)"data-dir").stringType().noDefaultValue().withDescription("The data id used to write the rows.");
    private static final ConfigOption<Boolean> SINK_FAIL = ConfigOptions.key((String)"sink-fail").booleanType().defaultValue((Object)false).withDescription("If set to true, then sink will throw an exception causing the job to fail, used to verify the TestStagedTable#abort.");

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        String dataDir = (String)helper.getOptions().get(DATA_DIR);
        boolean sinkFail = (Boolean)helper.getOptions().get(SINK_FAIL);
        return new SupportsStagingTableSink(dataDir, sinkFail);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.singleton(DATA_DIR);
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return Collections.singleton(SINK_FAIL);
    }

    private static class SupportsStagingTableSink
    implements DynamicTableSink,
    SupportsStaging {
        private final String dataDir;
        private final boolean sinkFail;
        private TestStagedTable stagedTable;

        public SupportsStagingTableSink(String dataDir, boolean sinkFail) {
            this(dataDir, sinkFail, null);
        }

        public SupportsStagingTableSink(String dataDir, boolean sinkFail, TestStagedTable stagedTable) {
            this.dataDir = dataDir;
            this.sinkFail = sinkFail;
            this.stagedTable = stagedTable;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.insertOnly();
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            return new DataStreamSinkProvider(){

                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink((SinkFunction)new StagedSinkFunction(dataDir, sinkFail)).setParallelism(1);
                }
            };
        }

        public DynamicTableSink copy() {
            return new SupportsStagingTableSink(this.dataDir, this.sinkFail, this.stagedTable);
        }

        public String asSummaryString() {
            return "SupportsStagingTableSink";
        }

        public StagedTable applyStaging(SupportsStaging.StagingContext context) {
            JOB_STATUS_CHANGE_PROCESS.clear();
            STAGING_PURPOSE_LIST.clear();
            this.stagedTable = new TestStagedTable(this.dataDir);
            STAGING_PURPOSE_LIST.add(context.getStagingPurpose());
            return this.stagedTable;
        }
    }

    private static class TestStagedTable
    implements StagedTable {
        private final String dataDir;

        public TestStagedTable(String dataDir) {
            this.dataDir = dataDir;
        }

        public void begin() {
            JOB_STATUS_CHANGE_PROCESS.add("begin");
        }

        public void commit() {
            JOB_STATUS_CHANGE_PROCESS.add("commit");
            new File(this.dataDir, "_data").renameTo(new File(this.dataDir, "data"));
        }

        public void abort() {
            JOB_STATUS_CHANGE_PROCESS.add("abort");
        }
    }

    private static class StagedSinkFunction
    extends RichSinkFunction<RowData> {
        private final String dataDir;
        private final boolean sinkFail;

        public StagedSinkFunction(String dataDir, boolean sinkFail) {
            this.dataDir = dataDir;
            this.sinkFail = sinkFail;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            File parentDir = new File(this.dataDir);
            if (parentDir.exists()) {
                parentDir.delete();
            }
            parentDir.mkdirs();
            new File(this.dataDir, "_data").createNewFile();
        }

        public void invoke(RowData value, SinkFunction.Context context) throws Exception {
            if (this.sinkFail) {
                throw new RuntimeException("Test StagedTable abort method.");
            }
            FileUtils.writeFileUtf8((File)new File(this.dataDir, "_data"), (String)(value.getInt(0) + "," + value.getString(1)));
        }
    }
}

