/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucket;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class FileWriterBucketStateSerializerMigrationTest {
    private static final int CURRENT_VERSION = 2;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final String BUCKET_ID = "test-bucket";
    private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("bucket-state-migration-test");
    private final BucketStateGenerator generator = new BucketStateGenerator("test-bucket", "writing", "wrote", BASE_PATH, 2);

    FileWriterBucketStateSerializerMigrationTest() {
    }

    static Stream<Integer> previousVersions() {
        return Stream.of(1, 2);
    }

    @Test
    @Disabled
    void prepareDeserializationEmpty() throws IOException {
        this.generator.prepareDeserializationEmpty();
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationEmpty(int previousVersion) throws IOException {
        String scenarioName = "empty";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion);
        java.nio.file.Path outputPath = pathResolver.getOutputPath("empty");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketState("empty", previousVersion);
        FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
        Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
        Assertions.assertThat((Object)bucket.getInProgressPart()).isNull();
        Assertions.assertThat((List)bucket.getPendingFiles()).isEmpty();
    }

    @Test
    @Disabled
    void prepareDeserializationOnlyInProgress() throws IOException {
        this.generator.prepareDeserializationOnlyInProgress();
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationOnlyInProgress(int previousVersion) throws IOException {
        String scenarioName = "only-in-progress";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion);
        java.nio.file.Path outputPath = pathResolver.getOutputPath("only-in-progress");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketState("only-in-progress", previousVersion);
        FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
        Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
        Assertions.assertThat((long)bucket.getInProgressPart().getSize()).isEqualTo(8L);
        long numFiles = Files.list(Paths.get(testBucketPath.toString(), new String[0])).map(file -> {
            Assertions.assertThat((String)file.getFileName().toString()).startsWith((CharSequence)".part-0-0.inprogress");
            return 1;
        }).count();
        Assertions.assertThat((long)numFiles).isEqualTo(1L);
    }

    @Test
    @Disabled
    void prepareDeserializationFull() throws IOException {
        this.generator.prepareDeserializationFull();
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationFull(int previousVersion) throws IOException, InterruptedException {
        this.testDeserializationFull(previousVersion, true, "full");
    }

    @Test
    @Disabled
    void prepareDeserializationNullInProgress() throws IOException {
        this.generator.prepareDeserializationNullInProgress();
    }

    @ParameterizedTest(name="Previous Version = {0}")
    @MethodSource(value={"previousVersions"})
    void testSerializationNullInProgress(int previousVersion) throws IOException, InterruptedException {
        this.testDeserializationFull(previousVersion, false, "full-no-in-progress");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDeserializationFull(int previousVersion, boolean withInProgress, String scenarioName) throws IOException, InterruptedException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion);
        try {
            java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName);
            Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
            FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketStateFromTemplate(scenarioName, previousVersion);
            int noOfPendingCheckpoints = 5;
            Map pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
            Assertions.assertThat((Map)pendingFileRecoverables).hasSize(5);
            Set beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = ".part-0-" + i + ".inprogress";
                Assertions.assertThat(beforeRestorePaths).anyMatch(s -> s.startsWith(part));
            }
            FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
            Assertions.assertThat((Object)bucket.getBucketPath()).isEqualTo((Object)testBucketPath);
            Assertions.assertThat((List)bucket.getPendingFiles()).hasSize(5);
            bucket.snapshotState();
            Collection committables = bucket.prepareCommit(false).stream().map(MockCommitRequest::new).collect(Collectors.toList());
            FileCommitter committer = new FileCommitter(FileWriterBucketStateSerializerMigrationTest.createBucketWriter());
            committer.commit(committables);
            Set afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = "part-0-" + i;
                Assertions.assertThat(afterRestorePaths).contains((Object[])new String[]{part});
                afterRestorePaths.remove(part);
            }
            if (withInProgress) {
                Assertions.assertThat(afterRestorePaths).hasSize(1);
                Assertions.assertThat(afterRestorePaths).anyMatch(s -> s.startsWith(".part-0-5.inprogress"));
            } else {
                Assertions.assertThat(afterRestorePaths).isEmpty();
            }
        }
        finally {
            FileUtils.deleteDirectory((File)pathResolver.getResourcePath(scenarioName).toFile());
        }
    }

    private static FileWriterBucket<String> restoreBucket(FileWriterBucketState bucketState) throws IOException {
        return FileWriterBucket.restore(FileWriterBucketStateSerializerMigrationTest.createBucketWriter(), (RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10L)).build(), (FileWriterBucketState)bucketState, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter(FileSystem.getLocalFileSystem().createRecoverableWriter(), (Encoder)new SimpleStringEncoder());
    }

    private static SimpleVersionedSerializer<FileWriterBucketState> bucketStateSerializer() throws IOException {
        RowWiseBucketWriter<String, String> bucketWriter = FileWriterBucketStateSerializerMigrationTest.createBucketWriter();
        return new FileWriterBucketStateSerializer(bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer());
    }

    private static FileWriterBucketState readBucketState(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        byte[] bytes = Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName));
        return (FileWriterBucketState)SimpleVersionedSerialization.readVersionAndDeSerialize(FileWriterBucketStateSerializerMigrationTest.bucketStateSerializer(), (byte[])bytes);
    }

    private static FileWriterBucketState readBucketStateFromTemplate(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        FileUtils.copy((Path)new Path(scenarioPath.toString() + "-template"), (Path)new Path(scenarioPath.toString()), (boolean)false);
        return FileWriterBucketStateSerializerMigrationTest.readBucketState(scenarioName, version);
    }
}

