/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnLocalResourceDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class YarnApplicationFileUploader
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationFileUploader.class);
    private final FileSystem fileSystem;
    private final ApplicationId applicationId;
    private final Path homeDir;
    private final Path applicationDir;
    private final Map<String, FileStatus> providedSharedLibs;
    private final Map<String, LocalResource> localResources;
    private final int fileReplication;
    private final List<Path> remotePaths;
    private final List<YarnLocalResourceDescriptor> envShipResourceList;
    private YarnLocalResourceDescriptor flinkDist;

    private YarnApplicationFileUploader(FileSystem fileSystem, Path homeDir, List<Path> providedLibDirs, ApplicationId applicationId, int fileReplication) throws IOException {
        this.fileSystem = (FileSystem)Preconditions.checkNotNull((Object)fileSystem);
        this.homeDir = (Path)Preconditions.checkNotNull((Object)homeDir);
        this.applicationId = (ApplicationId)Preconditions.checkNotNull((Object)applicationId);
        this.localResources = new HashMap<String, LocalResource>();
        this.applicationDir = this.getApplicationDir(applicationId);
        Preconditions.checkArgument((!this.isUsrLibDirIncludedInProvidedLib(providedLibDirs) ? 1 : 0) != 0, (String)"Provided lib directories, configured via %s, should not include %s.", (Object[])new Object[]{YarnConfigOptions.PROVIDED_LIB_DIRS.key(), "usrlib"});
        this.providedSharedLibs = this.getAllFilesInProvidedLibDirs(providedLibDirs);
        this.remotePaths = new ArrayList<Path>();
        this.envShipResourceList = new ArrayList<YarnLocalResourceDescriptor>();
        Preconditions.checkArgument((fileReplication >= 1 ? 1 : 0) != 0);
        this.fileReplication = fileReplication;
    }

    Map<String, LocalResource> getRegisteredLocalResources() {
        return this.localResources;
    }

    List<Path> getRemotePaths() {
        return this.remotePaths;
    }

    List<YarnLocalResourceDescriptor> getEnvShipResourceList() {
        return this.envShipResourceList;
    }

    Path getHomeDir() {
        return this.homeDir;
    }

    Path getApplicationDir() {
        return this.applicationDir;
    }

    @Override
    public void close() {
        IOUtils.closeQuietly((AutoCloseable)this.fileSystem);
    }

    YarnLocalResourceDescriptor registerSingleLocalResource(String key, Path resourcePath, String relativeDstPath, LocalResourceType resourceType, boolean whetherToAddToRemotePaths, boolean whetherToAddToEnvShipResourceList) throws IOException {
        this.addToRemotePaths(whetherToAddToRemotePaths, resourcePath);
        if (Utils.isRemotePath(resourcePath.toString())) {
            FileStatus fileStatus = this.fileSystem.getFileStatus(resourcePath);
            LOG.debug("Using remote file {} to register local resource", (Object)fileStatus.getPath());
            YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor.fromFileStatus(key, fileStatus, LocalResourceVisibility.APPLICATION, resourceType);
            this.addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
            this.localResources.put(key, descriptor.toLocalResource());
            return descriptor;
        }
        File localFile = new File(resourcePath.toUri().getPath());
        Tuple2<Path, Long> remoteFileInfo = this.uploadLocalFileToRemote(resourcePath, relativeDstPath);
        YarnLocalResourceDescriptor descriptor = new YarnLocalResourceDescriptor(key, (Path)remoteFileInfo.f0, localFile.length(), (Long)remoteFileInfo.f1, LocalResourceVisibility.APPLICATION, resourceType);
        this.addToEnvShipResourceList(whetherToAddToEnvShipResourceList, descriptor);
        this.localResources.put(key, descriptor.toLocalResource());
        return descriptor;
    }

    Tuple2<Path, Long> uploadLocalFileToRemote(Path localSrcPath, String relativeDstPath) throws IOException {
        File localFile = new File(localSrcPath.toUri().getPath());
        Preconditions.checkArgument((!localFile.isDirectory() ? 1 : 0) != 0, (Object)("File to copy cannot be a directory: " + localSrcPath));
        Path dst = this.copyToRemoteApplicationDir(localSrcPath, relativeDstPath, this.fileReplication);
        FileStatus[] fss = this.waitForTransferToComplete(dst);
        if (fss == null || fss.length <= 0) {
            LOG.debug("Failed to fetch remote modification time from {}, using local timestamp {}", (Object)dst, (Object)localFile.lastModified());
            return Tuple2.of((Object)dst, (Object)localFile.lastModified());
        }
        LOG.debug("Got modification time {} from remote path {}", (Object)fss[0].getModificationTime(), (Object)dst);
        return Tuple2.of((Object)dst, (Object)fss[0].getModificationTime());
    }

    List<String> registerMultipleLocalResources(Collection<Path> shipFiles, String localResourcesDirectory, LocalResourceType resourceType) throws IOException {
        ArrayList<Path> localPaths = new ArrayList<Path>();
        ArrayList<Path> relativePaths = new ArrayList<Path>();
        for (Path shipFile : shipFiles) {
            block9: {
                if (Utils.isRemotePath(shipFile.toString())) {
                    if (this.fileSystem.isDirectory(shipFile)) {
                        URI parentURI = shipFile.getParent().toUri();
                        RemoteIterator iterable = this.fileSystem.listFiles(shipFile, true);
                        while (iterable.hasNext()) {
                            Path current = ((LocatedFileStatus)iterable.next()).getPath();
                            localPaths.add(current);
                            relativePaths.add(new Path(localResourcesDirectory, parentURI.relativize(current.toUri()).getPath()));
                        }
                        continue;
                    }
                } else {
                    File file = new File(shipFile.toUri().getPath());
                    if (file.isDirectory()) {
                        java.nio.file.Path shipPath = file.toPath().toRealPath(new LinkOption[0]);
                        java.nio.file.Path parentPath = shipPath.getParent();
                        Collection paths = FileUtils.listFilesInDirectory((java.nio.file.Path)shipPath, path -> true);
                        for (java.nio.file.Path javaPath : paths) {
                            localPaths.add(new Path(javaPath.toUri()));
                            relativePaths.add(new Path(localResourcesDirectory, parentPath.relativize(javaPath).toString()));
                        }
                    }
                }
                break block9;
                continue;
            }
            localPaths.add(shipFile);
            relativePaths.add(new Path(localResourcesDirectory, shipFile.getName()));
        }
        HashSet<String> archives = new HashSet<String>();
        HashSet<String> resources = new HashSet<String>();
        for (int i = 0; i < localPaths.size(); ++i) {
            String key;
            YarnLocalResourceDescriptor resourceDescriptor;
            Path localPath = (Path)localPaths.get(i);
            Path relativePath = (Path)relativePaths.get(i);
            if (YarnApplicationFileUploader.isFlinkDistJar(relativePath.getName()) || (resourceDescriptor = this.registerSingleLocalResource(key = relativePath.toString(), localPath, relativePath.getParent().toString(), resourceType, true, true)).alreadyRegisteredAsLocalResource()) continue;
            if (key.endsWith("jar")) {
                archives.add(relativePath.toString());
                continue;
            }
            resources.add(relativePath.getParent().toString());
        }
        ArrayList<String> classPaths = new ArrayList<String>();
        resources.stream().sorted().forEach(classPaths::add);
        archives.stream().sorted().forEach(classPaths::add);
        return classPaths;
    }

    public YarnLocalResourceDescriptor uploadFlinkDist(Path localJarPath) throws IOException, ClusterDeploymentException {
        if (this.flinkDist != null) {
            return this.flinkDist;
        }
        if (!this.providedSharedLibs.isEmpty()) {
            throw new ClusterDeploymentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" has to also include the lib/, plugin/ and flink-dist jar. In other case, it cannot be used.");
        }
        this.flinkDist = this.registerSingleLocalResource(localJarPath.getName(), localJarPath, "", LocalResourceType.FILE, true, false);
        return this.flinkDist;
    }

    List<String> registerProvidedLocalResources() {
        Preconditions.checkNotNull(this.localResources);
        ArrayList<String> classPaths = new ArrayList<String>();
        HashSet resourcesJar = new HashSet();
        HashSet resourcesDir = new HashSet();
        this.providedSharedLibs.forEach((fileName, fileStatus) -> {
            Path filePath = fileStatus.getPath();
            LOG.debug("Using remote file {} to register local resource", (Object)filePath);
            YarnLocalResourceDescriptor descriptor = YarnLocalResourceDescriptor.fromFileStatus(fileName, fileStatus, LocalResourceVisibility.PUBLIC, LocalResourceType.FILE);
            this.localResources.put((String)fileName, descriptor.toLocalResource());
            this.remotePaths.add(filePath);
            this.envShipResourceList.add(descriptor);
            if (!YarnApplicationFileUploader.isFlinkDistJar(filePath.getName()) && !YarnApplicationFileUploader.isPlugin(filePath)) {
                if (fileName.endsWith("jar")) {
                    resourcesJar.add(fileName);
                } else {
                    resourcesDir.add(new Path(fileName).getParent().toString());
                }
            } else if (YarnApplicationFileUploader.isFlinkDistJar(filePath.getName())) {
                this.flinkDist = descriptor;
            }
        });
        resourcesDir.stream().sorted().forEach(classPaths::add);
        resourcesJar.stream().sorted().forEach(classPaths::add);
        return classPaths;
    }

    static YarnApplicationFileUploader from(FileSystem fileSystem, Path homeDirectory, List<Path> providedLibDirs, ApplicationId applicationId, int fileReplication) throws IOException {
        return new YarnApplicationFileUploader(fileSystem, homeDirectory, providedLibDirs, applicationId, fileReplication);
    }

    private Path copyToRemoteApplicationDir(Path localSrcPath, String relativeDstPath, int replicationFactor) throws IOException {
        Path applicationDir = YarnApplicationFileUploader.getApplicationDirPath(this.homeDir, this.applicationId);
        String suffix = (String)(relativeDstPath.isEmpty() ? "" : relativeDstPath + "/") + localSrcPath.getName();
        Path dst = new Path(applicationDir, suffix);
        Path localSrcPathWithScheme = StringUtils.isNullOrWhitespaceOnly((String)localSrcPath.toUri().getScheme()) ? new Path(URI.create("file:///").resolve(localSrcPath.toUri())) : localSrcPath;
        LOG.debug("Copying from {} to {} with replication factor {}", new Object[]{localSrcPathWithScheme, dst, replicationFactor});
        this.fileSystem.copyFromLocalFile(false, true, localSrcPathWithScheme, dst);
        this.fileSystem.setReplication(dst, (short)replicationFactor);
        return dst;
    }

    private FileStatus[] waitForTransferToComplete(Path dst) throws IOException {
        int noOfRetries = 3;
        int retryDelayMs = 100;
        for (int iter = 1; iter <= 4; ++iter) {
            try {
                return this.fileSystem.listStatus(dst);
            }
            catch (FileNotFoundException e) {
                LOG.debug("Got FileNotFoundException while fetching uploaded remote resources at retry num {}", (Object)iter);
                try {
                    LOG.debug("Sleeping for {}ms", (Object)100);
                    TimeUnit.MILLISECONDS.sleep(100L);
                }
                catch (InterruptedException ie) {
                    LOG.warn("Failed to sleep for {}ms at retry num {} while fetching uploaded remote resources", new Object[]{100, iter, ie});
                }
                continue;
            }
        }
        return null;
    }

    private static boolean isFlinkDistJar(String fileName) {
        return fileName.startsWith("flink-dist") && fileName.endsWith("jar");
    }

    private static boolean isPlugin(Path path) {
        for (Path parent = path.getParent(); parent != null; parent = parent.getParent()) {
            if (!"plugins".equals(parent.getName())) continue;
            return true;
        }
        return false;
    }

    static Path getApplicationDirPath(Path homeDir, ApplicationId applicationId) {
        return new Path((Path)Preconditions.checkNotNull((Object)homeDir), ".flink/" + Preconditions.checkNotNull((Object)applicationId) + "/");
    }

    private Path getApplicationDir(ApplicationId applicationId) throws IOException {
        Path applicationDir = YarnApplicationFileUploader.getApplicationDirPath(this.homeDir, applicationId);
        if (!this.fileSystem.exists(applicationDir)) {
            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
            this.fileSystem.mkdirs(applicationDir, permission);
        }
        return applicationDir;
    }

    private Map<String, FileStatus> getAllFilesInProvidedLibDirs(List<Path> providedLibDirs) {
        HashMap allFiles = new HashMap();
        ((List)Preconditions.checkNotNull(providedLibDirs)).forEach(FunctionUtils.uncheckedConsumer(path -> {
            if (!this.fileSystem.exists(path) || !this.fileSystem.isDirectory(path)) {
                LOG.warn("Provided lib dir {} does not exist or is not a directory. Ignoring.", path);
            } else {
                RemoteIterator iterable = this.fileSystem.listFiles(path, true);
                while (iterable.hasNext()) {
                    LocatedFileStatus locatedFileStatus = (LocatedFileStatus)iterable.next();
                    String name = path.getParent().toUri().relativize(locatedFileStatus.getPath().toUri()).toString();
                    FileStatus prevMapping = (FileStatus)allFiles.put(name, locatedFileStatus);
                    if (prevMapping == null) continue;
                    throw new IOException("Two files with the same filename exist in the shared libs: " + prevMapping.getPath() + " - " + locatedFileStatus.getPath() + ". Please deduplicate.");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("The following files were found in the shared lib dir: {}", (Object)allFiles.values().stream().map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.joining(", ")));
                }
            }
        }));
        return Collections.unmodifiableMap(allFiles);
    }

    private boolean isUsrLibDirIncludedInProvidedLib(List<Path> providedLibDirs) throws IOException {
        for (Path path : providedLibDirs) {
            if (!Utils.isUsrLibDirectory(this.fileSystem, path)) continue;
            return true;
        }
        return false;
    }

    private void addToRemotePaths(boolean add, Path path) {
        if (add) {
            this.remotePaths.add(path);
        }
    }

    private void addToEnvShipResourceList(boolean add, YarnLocalResourceDescriptor descriptor) {
        if (add) {
            this.envShipResourceList.add(descriptor);
        }
    }
}

