/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.connector.file.table.FileSystemCommitterTest;
import org.apache.flink.connector.file.table.FileSystemOutputFormat;
import org.apache.flink.connector.file.table.PartitionComputer;
import org.apache.flink.connector.file.table.RowPartitionComputer;
import org.apache.flink.connector.file.table.TableMetaStoreFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FileSystemOutputFormatTest {
    @TempDir
    private java.nio.file.Path outputPath;
    @TempDir
    private java.nio.file.Path stagingBaseDir;
    private final TestingFinalizationContext finalizationContext = new TestingFinalizationContext();
    private static final Supplier<List<StreamRecord<Row>>> DEFAULT_INPUT_SUPPLIER = () -> Arrays.asList(new StreamRecord((Object)Row.of((Object[])new Object[]{"a1", 1, "p1"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2, "p1"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2, "p2"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a3", 3, "p1"}), 1L));
    private static final Supplier<List<String>> DEFAULT_OUTPUT_SUPPLIER = () -> Collections.singletonList(FileSystemOutputFormatTest.createFileContent("a1,1,p1", "a2,2,p1", "a2,2,p2", "a3,3,p1"));

    FileSystemOutputFormatTest() {
    }

    private static Map<File, String> getFileContentByPath(java.nio.file.Path directory) throws IOException {
        HashMap<File, String> contents = new HashMap<File, String>(4);
        if (Files.notExists(directory, new LinkOption[0]) || !Files.isDirectory(directory, new LinkOption[0])) {
            return contents;
        }
        Collection filesInBucket = FileUtils.listFiles((File)directory.toFile(), null, (boolean)true);
        for (File file : filesInBucket) {
            contents.put(file, FileUtils.readFileToString((File)file));
        }
        return contents;
    }

    private static String createFileContent(String ... rows) {
        return Arrays.stream(rows).collect(Collectors.joining("\n", "", "\n"));
    }

    @BeforeEach
    void before() {
        RowUtils.USE_LEGACY_TO_STRING = true;
    }

    @AfterEach
    void after() {
        RowUtils.USE_LEGACY_TO_STRING = false;
    }

    @Test
    void testClosingWithoutInput() throws Exception {
        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = this.createTestHarness(this.createSinkFormat(false, false, false, new LinkedHashMap<String, String>()));){
            testHarness.setup();
            testHarness.open();
        }
    }

    @Test
    void testNonPartition() throws Exception {
        this.checkWriteAndCommit(false, false, false, new LinkedHashMap<String, String>(), DEFAULT_INPUT_SUPPLIER, DEFAULT_OUTPUT_SUPPLIER);
    }

    @Test
    void testOverrideNonPartition() throws Exception {
        this.testNonPartition();
        this.checkWriteAndCommit(true, false, false, new LinkedHashMap<String, String>(), DEFAULT_INPUT_SUPPLIER, DEFAULT_OUTPUT_SUPPLIER);
    }

    @Test
    void testStaticPartition() throws Exception {
        LinkedHashMap<String, String> staticParts = new LinkedHashMap<String, String>();
        staticParts.put("c", "p1");
        this.checkWriteAndCommit(false, true, false, staticParts, () -> Arrays.asList(new StreamRecord((Object)Row.of((Object[])new Object[]{"a1", 1}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a3", 3}), 1L)), () -> Collections.singletonMap("c=p1", FileSystemOutputFormatTest.createFileContent("a1,1", "a2,2", "a2,2", "a3,3")));
    }

    @Test
    void testDynamicPartition() throws Exception {
        this.checkWriteAndCommit(false, true, false, new LinkedHashMap<String, String>(), DEFAULT_INPUT_SUPPLIER, () -> ImmutableMap.of((Object)"c=p1", (Object)FileSystemOutputFormatTest.createFileContent("a1,1", "a2,2", "a3,3"), (Object)"c=p2", (Object)FileSystemOutputFormatTest.createFileContent("a2,2")));
    }

    @Test
    void testGroupedDynamicPartition() throws Exception {
        this.checkWriteAndCommit(false, true, true, new LinkedHashMap<String, String>(), () -> Arrays.asList(new StreamRecord((Object)Row.of((Object[])new Object[]{"a1", 1, "p1"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2, "p1"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a3", 3, "p1"}), 1L), new StreamRecord((Object)Row.of((Object[])new Object[]{"a2", 2, "p2"}), 1L)), () -> ImmutableMap.of((Object)"c=p1", (Object)FileSystemOutputFormatTest.createFileContent("a1,1", "a2,2", "a3,3"), (Object)"c=p2", (Object)FileSystemOutputFormatTest.createFileContent("a2,2")));
    }

    @Test
    void testGetUniqueStagingDirectory() throws IOException {
        Path alreadyExistingStagingDir = new Path(this.outputPath.toFile().getAbsolutePath());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)alreadyExistingStagingDir.getFileSystem().exists(alreadyExistingStagingDir)).as("The staging folder should already exist.", new Object[0])).isTrue();
        FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder().setPartitionColumns(new String[0]).setFormatFactory(TextOutputFormat::new).setMetaStoreFactory((TableMetaStoreFactory)new FileSystemCommitterTest.TestMetaStoreFactory(new Path(this.outputPath.toFile().getAbsolutePath()))).setPartitionComputer((PartitionComputer)new RowPartitionComputer("default", new String[0], new String[0])).setStagingPath(alreadyExistingStagingDir);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FileSystemOutputFormat.Builder)builder).build()).as("Reusing a folder should cause an error.", new Object[0])).isInstanceOf(IllegalStateException.class);
    }

    private void checkWriteAndCommit(boolean override, boolean partitioned, boolean dynamicGrouped, LinkedHashMap<String, String> staticPartitions, Supplier<List<StreamRecord<Row>>> inputSupplier, Supplier<?> outputSupplier) throws Exception {
        Object expectedOutput = outputSupplier.get();
        int expectedFileNum = partitioned ? ((Map)expectedOutput).size() : ((List)expectedOutput).size();
        FileSystemOutputFormat<Row> outputFormat = this.createSinkFormat(override, partitioned, dynamicGrouped, staticPartitions);
        try (OneInputStreamOperatorTestHarness<Row, Object> testHarness = this.createTestHarness(outputFormat);){
            testHarness.setup();
            testHarness.open();
            for (StreamRecord<Row> record : inputSupplier.get()) {
                testHarness.processElement(record);
            }
            Assertions.assertThat(FileSystemOutputFormatTest.getFileContentByPath(this.stagingBaseDir)).hasSize(expectedFileNum);
        }
        outputFormat.finalizeGlobal((FinalizeOnMaster.FinalizationContext)this.finalizationContext);
        Assertions.assertThat((java.nio.file.Path)this.stagingBaseDir).isEmptyDirectory();
        Map<File, String> fileToContent = FileSystemOutputFormatTest.getFileContentByPath(this.outputPath);
        Assertions.assertThat(fileToContent).hasSize(expectedFileNum);
        if (partitioned) {
            Map<String, String> partitionToContent = fileToContent.entrySet().stream().collect(Collectors.toMap(e -> ((File)e.getKey()).getParentFile().getName(), Map.Entry::getValue));
            Assertions.assertThat(partitionToContent).containsExactlyInAnyOrderEntriesOf((Map)expectedOutput);
        } else {
            Assertions.assertThat(fileToContent.values()).containsExactlyInAnyOrderElementsOf((Iterable)((List)expectedOutput));
        }
    }

    private FileSystemOutputFormat<Row> createSinkFormat(boolean override, boolean partition, boolean dynamicGrouped, LinkedHashMap<String, String> staticPartitions) {
        String[] stringArray;
        String[] columnNames = new String[]{"a", "b", "c"};
        if (partition) {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = "c";
        } else {
            stringArray = new String[]{};
        }
        String[] partitionColumns = stringArray;
        Path path = new Path(this.outputPath.toString());
        FileSystemCommitterTest.TestMetaStoreFactory msFactory = new FileSystemCommitterTest.TestMetaStoreFactory(path);
        return new FileSystemOutputFormat.Builder().setMetaStoreFactory((TableMetaStoreFactory)msFactory).setPath(new Path(this.stagingBaseDir.toString())).setOverwrite(override).setPartitionColumns(partitionColumns).setPartitionComputer((PartitionComputer)new RowPartitionComputer("default", columnNames, partitionColumns)).setFormatFactory(TextOutputFormat::new).setDynamicGrouped(dynamicGrouped).setStaticPartitions(staticPartitions).build();
    }

    private OneInputStreamOperatorTestHarness<Row, Object> createTestHarness(FileSystemOutputFormat<Row> outputFormat) throws Exception {
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink((SinkFunction)new OutputFormatSinkFunction(outputFormat)), 3, 3, 0);
    }

    private static class TestingFinalizationContext
    implements FinalizeOnMaster.FinalizationContext {
        private TestingFinalizationContext() {
        }

        public int getParallelism() {
            return 1;
        }

        public int getFinishedAttempt(int subtaskIndex) {
            return 0;
        }
    }
}

