/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.artifact;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.cli.ArtifactFetchOptions;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.kubernetes.artifact.KubernetesArtifactUploader;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKubernetesArtifactUploader
implements KubernetesArtifactUploader {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKubernetesArtifactUploader.class);

    @Override
    public void uploadAll(Configuration config) throws Exception {
        if (!((Boolean)config.get(KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED)).booleanValue()) {
            LOG.info("Local artifact uploading is disabled. Set '{}' to enable.", (Object)KubernetesConfigOptions.LOCAL_UPLOAD_ENABLED.key());
            return;
        }
        String jobUri = this.upload(config, this.getJobUri(config));
        this.updateConfig(config, (ConfigOption<List<String>>)PipelineOptions.JARS, Collections.singletonList(jobUri));
        List additionalUris = config.getOptional(ArtifactFetchOptions.ARTIFACT_LIST).orElse(Collections.emptyList());
        List<String> uploadedAdditionalUris = additionalUris.stream().map(FunctionUtils.uncheckedFunction(artifactUri -> this.upload(config, (String)artifactUri))).collect(Collectors.toList());
        this.updateConfig(config, (ConfigOption<List<String>>)ArtifactFetchOptions.ARTIFACT_LIST, uploadedAdditionalUris);
    }

    @VisibleForTesting
    String upload(Configuration config, String artifactUriStr) throws IOException, URISyntaxException {
        URI artifactUri = PackagedProgramUtils.resolveURI((String)artifactUriStr);
        if (!"local".equals(artifactUri.getScheme())) {
            return artifactUriStr;
        }
        String targetDir = (String)config.get(KubernetesConfigOptions.LOCAL_UPLOAD_TARGET);
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)targetDir) ? 1 : 0) != 0, (Object)String.format("Setting '%s' to a valid remote path is required.", KubernetesConfigOptions.LOCAL_UPLOAD_TARGET.key()));
        FileSystem.WriteMode writeMode = (Boolean)config.get(KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE) != false ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
        File src = new File(artifactUri.getPath());
        Path target = new Path(targetDir, src.getName());
        if (target.getFileSystem().exists(target) && writeMode == FileSystem.WriteMode.NO_OVERWRITE) {
            LOG.info("Skip uploading artifact '{}', as it already exists. To overwrite existing artifacts, please set the '{}' config option.", (Object)target, (Object)KubernetesConfigOptions.LOCAL_UPLOAD_OVERWRITE.key());
        } else {
            long start = System.currentTimeMillis();
            FileSystem fs = target.getFileSystem();
            try (FSDataOutputStream os = fs.create(target, writeMode);){
                FileUtils.copyFile((File)src, (OutputStream)os);
            }
            LOG.debug("Copied file from {} to {}, cost {} ms", new Object[]{src, target, System.currentTimeMillis() - start});
        }
        return target.toString();
    }

    @VisibleForTesting
    void updateConfig(Configuration config, ConfigOption<List<String>> configOption, List<String> newValue) {
        List<String> originalValue = config.getOptional(configOption).orElse(Collections.emptyList());
        if (this.hasLocal(originalValue)) {
            LOG.info("Updating configuration '{}' after to replace local artifact: '{}'", (Object)configOption.key(), newValue);
            config.set(configOption, newValue);
        }
    }

    private String getJobUri(Configuration config) {
        List jars = (List)config.get(PipelineOptions.JARS);
        Preconditions.checkArgument((jars.size() == 1 ? 1 : 0) != 0, (Object)String.format("The '%s' config must contain one JAR.", PipelineOptions.JARS.key()));
        return (String)((List)config.get(PipelineOptions.JARS)).get(0);
    }

    private boolean hasLocal(List<String> originalUris) {
        return originalUris.stream().anyMatch(uri -> uri.contains("local:/"));
    }
}

