package org.apache.hadoop.yarn.service.client;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KDiag;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.FindClass;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.JavaCommandLineBuilder;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.exceptions.BadConfigException;
import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;

@InterfaceAudience.Public
@InterfaceStability.Unstable
/* loaded from: input_file:WEB-INF/lib/hadoop-yarn-services-core-3.3.4.208-eep-911.jar:org/apache/hadoop/yarn/service/client/ServiceClient.class */
public class ServiceClient extends AppAdminClient implements SliderExitCodes, YarnServiceConstants {
    private SliderFileSystem fs;
    protected YarnClient yarnClient;
    private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap();
    private RegistryOperations registryClient;
    private CuratorFramework curatorClient;
    private YarnRPC rpc;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ServiceClient.class);
    private static EnumSet<YarnApplicationState> terminatedStates = EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED);
    private static EnumSet<YarnApplicationState> liveStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING);
    private static EnumSet<YarnApplicationState> preRunningStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-yarn-services-core-3.3.4.208-eep-911.jar:org/apache/hadoop/yarn/service/client/ServiceClient$AppInfo.class */
    public static class AppInfo {
        ApplicationId appId;
        String principalName;

        AppInfo(ApplicationId applicationId, String str) {
            this.appId = applicationId;
            this.principalName = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.CompositeService, org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        this.fs = new SliderFileSystem(configuration);
        this.yarnClient = YarnClient.createYarnClient();
        this.rpc = YarnRPC.create(configuration);
        addService(this.yarnClient);
        super.serviceInit(configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.CompositeService, org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.registryClient != null) {
            this.registryClient.stop();
        }
        this.fs.getFileSystem().close();
        super.serviceStop();
    }

    public Service loadAppJsonFromLocalFS(String str, String str2, Long l, String str3) throws IOException, YarnException {
        String[] split;
        File file = new File(str);
        if (!file.exists() && str.equals(file.getName())) {
            String str4 = System.getenv("YARN_SERVICE_EXAMPLES_DIR");
            if (str4 == null) {
                String str5 = System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
                split = new String[]{str5 + "/share/hadoop/yarn/yarn-service-examples", str5 + "/yarn-service-examples"};
            } else {
                split = StringUtils.split(str4, ":");
            }
            for (String str6 : split) {
                file = new File(MessageFormat.format("{0}/{1}/{2}.json", str6, str, str));
                if (file.exists()) {
                    break;
                }
                file = new File(MessageFormat.format("{0}/{1}.json", str6, str));
                if (file.exists()) {
                    break;
                }
            }
        }
        if (!file.exists()) {
            throw new YarnException("File or example could not be found: " + str);
        }
        Path path = new Path(file.getAbsolutePath());
        LOG.info("Loading service definition from local FS: " + path);
        Service load = ServiceApiUtil.jsonSerDeser.load(FileSystem.getLocal(getConfig()), path);
        if (!StringUtils.isEmpty(str2)) {
            load.setName(str2);
        }
        if (l != null && l.longValue() > 0) {
            load.setLifetime(l);
        }
        if (!StringUtils.isEmpty(str3)) {
            load.setQueue(str3);
        }
        return load;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionSave(String str, String str2, Long l, String str3) throws IOException, YarnException {
        return actionBuild(loadAppJsonFromLocalFS(str, str2, l, str3));
    }

    public int actionBuild(Service service) throws YarnException, IOException {
        ServiceApiUtil.validateAndResolveService(service, this.fs, getConfig());
        ServiceApiUtil.createDirAndPersistApp(this.fs, checkAppNotExistOnHdfs(service, false), service);
        return 0;
    }

    private ApplicationReport upgradePrecheck(Service service) throws YarnException, IOException {
        if (!getConfig().getBoolean(YarnServiceConf.YARN_SERVICE_UPGRADE_ENABLED, false)) {
            throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED);
        }
        Service loadService = ServiceApiUtil.loadService(this.fs, service.getName());
        if (!StringUtils.isEmpty(loadService.getId())) {
            this.cachedAppInfo.put(loadService.getName(), new AppInfo(ApplicationId.fromString(loadService.getId()), loadService.getKerberosPrincipal().getPrincipalName()));
        }
        if (loadService.getVersion().equals(service.getVersion())) {
            String str = service.getName() + " is already at version " + service.getVersion() + ". There is nothing to upgrade.";
            LOG.error(str);
            throw new YarnException(str);
        }
        boolean z = false;
        Iterator<Component> it = loadService.getComponents().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().getRestartPolicy().equals(Component.RestartPolicyEnum.NEVER)) {
                z = true;
                break;
            }
        }
        if (!z) {
            String str2 = "All the components of the service " + service.getName() + " have " + Component.RestartPolicyEnum.NEVER + " restart policy, so it cannot be upgraded.";
            LOG.error(str2);
            throw new YarnException(str2);
        }
        Service status = getStatus(service.getName());
        if (!status.getState().equals(ServiceState.STABLE)) {
            String str3 = service.getName() + " is at " + status.getState() + " state and upgrade can only be initiated when service is STABLE.";
            LOG.error(str3);
            throw new YarnException(str3);
        }
        Path checkAppNotExistOnHdfs = checkAppNotExistOnHdfs(service, true);
        ServiceApiUtil.validateAndResolveService(service, this.fs, getConfig());
        ServiceApiUtil.createDirAndPersistApp(this.fs, checkAppNotExistOnHdfs, service);
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(getAppId(service.getName()));
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(service.getName() + " AM hostname is empty");
        }
        return applicationReport;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionUpgradeExpress(String str, File file) throws IOException, YarnException {
        Service loadAppJsonFromLocalFS = loadAppJsonFromLocalFS(file.getAbsolutePath(), str, null, null);
        loadAppJsonFromLocalFS.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
        actionUpgradeExpress(loadAppJsonFromLocalFS);
        return 0;
    }

    public int actionUpgradeExpress(Service service) throws YarnException, IOException {
        ClientAMProtocol createAMProxy = createAMProxy(service.getName(), upgradePrecheck(service));
        ClientAMProtocol.UpgradeServiceRequestProto.Builder newBuilder = ClientAMProtocol.UpgradeServiceRequestProto.newBuilder();
        newBuilder.setVersion(service.getVersion());
        if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
            newBuilder.setAutoFinalize(true);
        }
        if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
            newBuilder.setExpressUpgrade(true);
            newBuilder.setAutoFinalize(true);
        }
        ClientAMProtocol.UpgradeServiceResponseProto upgrade = createAMProxy.upgrade(newBuilder.build());
        if (!upgrade.hasError()) {
            return 0;
        }
        LOG.error("Service {} express upgrade to version {} failed because {}", service.getName(), service.getVersion(), upgrade.getError());
        throw new YarnException("Failed to express upgrade service " + service.getName() + " to version " + service.getVersion() + " because " + upgrade.getError());
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int initiateUpgrade(String str, String str2, boolean z) throws IOException, YarnException {
        Service loadAppJsonFromLocalFS = loadAppJsonFromLocalFS(str2, str, null, null);
        if (z) {
            loadAppJsonFromLocalFS.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
        } else {
            loadAppJsonFromLocalFS.setState(ServiceState.UPGRADING);
        }
        return initiateUpgrade(loadAppJsonFromLocalFS);
    }

    public int initiateUpgrade(Service service) throws YarnException, IOException {
        org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy = createAMProxy(service.getName(), upgradePrecheck(service));
        ClientAMProtocol.UpgradeServiceRequestProto.Builder newBuilder = ClientAMProtocol.UpgradeServiceRequestProto.newBuilder();
        newBuilder.setVersion(service.getVersion());
        if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
            newBuilder.setAutoFinalize(true);
        }
        ClientAMProtocol.UpgradeServiceResponseProto upgrade = createAMProxy.upgrade(newBuilder.build());
        if (!upgrade.hasError()) {
            return 0;
        }
        LOG.error("Service {} upgrade to version {} failed because {}", service.getName(), service.getVersion(), upgrade.getError());
        throw new YarnException("Failed to upgrade service " + service.getName() + " to version " + service.getVersion() + " because " + upgrade.getError());
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionUpgradeInstances(String str, List<String> list) throws IOException, YarnException {
        checkAppExistOnHdfs(str);
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        List<Container> liveContainers = ServiceApiUtil.getLiveContainers(loadService, list);
        ServiceApiUtil.validateInstancesUpgrade(liveContainers);
        return actionUpgrade(loadService, liveContainers);
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionUpgradeComponents(String str, List<String> list) throws IOException, YarnException {
        checkAppExistOnHdfs(str);
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        return actionUpgrade(loadService, ServiceApiUtil.validateAndResolveCompsUpgrade(loadService, list));
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionCancelUpgrade(String str) throws IOException, YarnException {
        Service status = getStatus(str);
        if (status == null || !ServiceState.isUpgrading(status.getState())) {
            throw new YarnException("Service " + str + " is not upgrading, so nothing to cancel.");
        }
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(getAppId(str));
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(str + " AM hostname is empty");
        }
        createAMProxy(str, applicationReport).cancelUpgrade(ClientAMProtocol.CancelUpgradeRequestProto.newBuilder().build());
        return 0;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionDecommissionInstances(String str, List<String> list) throws IOException, YarnException {
        checkAppExistOnHdfs(str);
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        if (StringUtils.isEmpty(loadService.getId())) {
            throw new YarnException(loadService.getName() + " appId is null, may be not submitted to YARN yet");
        }
        this.cachedAppInfo.put(loadService.getName(), new AppInfo(ApplicationId.fromString(loadService.getId()), loadService.getKerberosPrincipal().getPrincipalName()));
        for (String str2 : list) {
            Component component = loadService.getComponent(ServiceApiUtil.parseComponentName(ServiceApiUtil.parseAndValidateComponentInstanceName(str2, str, getConfig())));
            if (component == null) {
                throw new IllegalArgumentException(str2 + " does not exist !");
            }
            if (!component.getDecommissionedInstances().contains(str2)) {
                component.addDecommissionedInstance(str2);
                component.setNumberOfContainers(Long.valueOf(Math.max(0L, component.getNumberOfContainers().longValue() - 1)));
            }
        }
        ServiceApiUtil.writeAppDefinition(this.fs, loadService);
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(ApplicationId.fromString(loadService.getId()));
        if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
            String str3 = loadService.getName() + " is at " + applicationReport.getYarnApplicationState() + " state, decommission can only be invoked when service is running";
            LOG.error(str3);
            throw new YarnException(str3);
        }
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(loadService.getName() + " AM hostname is empty");
        }
        org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy = createAMProxy(loadService.getName(), applicationReport);
        ClientAMProtocol.DecommissionCompInstancesRequestProto.Builder newBuilder = ClientAMProtocol.DecommissionCompInstancesRequestProto.newBuilder();
        newBuilder.addAllCompInstances(list);
        createAMProxy.decommissionCompInstances(newBuilder.build());
        return 0;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionCleanUp(String str, String str2) throws IOException, YarnException {
        return cleanUpRegistry(str, str2) ? 0 : -1;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public String getInstances(String str, List<String> list, String str2, List<String> list2) throws IOException, YarnException {
        return filterContainers(str, list, str2, list2).getCompInstances();
    }

    public ComponentContainers[] getContainers(String str, List<String> list, String str2, List<ContainerState> list2) throws IOException, YarnException {
        return ServiceApiUtil.COMP_CONTAINERS_JSON_SERDE.fromJson(filterContainers(str, list, str2, list2 != null ? (List) list2.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList()) : null).getCompInstances());
    }

    private ClientAMProtocol.GetCompInstancesResponseProto filterContainers(String str, List<String> list, String str2, List<String> list2) throws IOException, YarnException {
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(getAppId(str));
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(str + " AM hostname is empty.");
        }
        org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy = createAMProxy(str, applicationReport);
        ClientAMProtocol.GetCompInstancesRequestProto.Builder newBuilder = ClientAMProtocol.GetCompInstancesRequestProto.newBuilder();
        if (list != null && !list.isEmpty()) {
            newBuilder.addAllComponentNames(list);
        }
        if (str2 != null) {
            newBuilder.setVersion(str2);
        }
        if (list2 != null && !list2.isEmpty()) {
            newBuilder.addAllContainerStates(list2);
        }
        return createAMProxy.getCompInstances(newBuilder.build());
    }

    public int actionUpgrade(Service service, List<Container> list) throws IOException, YarnException {
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(getAppId(service.getName()));
        if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
            String str = service.getName() + " is at " + applicationReport.getYarnApplicationState() + " state, upgrade can only be invoked when service is running.";
            LOG.error(str);
            throw new YarnException(str);
        }
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(service.getName() + " AM hostname is empty.");
        }
        org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy = createAMProxy(service.getName(), applicationReport);
        ArrayList arrayList = new ArrayList();
        list.forEach(container -> {
            arrayList.add(container.getId());
        });
        LOG.info("instances to upgrade {}", arrayList);
        ClientAMProtocol.CompInstancesUpgradeRequestProto.Builder newBuilder = ClientAMProtocol.CompInstancesUpgradeRequestProto.newBuilder();
        newBuilder.addAllContainerIds(arrayList);
        createAMProxy.upgrade(newBuilder.build());
        return 0;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionLaunch(String str, String str2, Long l, String str3) throws IOException, YarnException {
        actionCreate(loadAppJsonFromLocalFS(str, str2, l, str3));
        return 0;
    }

    public ApplicationId actionCreate(Service service) throws IOException, YarnException {
        String name = service.getName();
        ServiceApiUtil.validateAndResolveService(service, this.fs, getConfig());
        verifyNoLiveAppInRM(name, FindClass.A_CREATE);
        Path checkAppNotExistOnHdfs = checkAppNotExistOnHdfs(service, false);
        ServiceApiUtil.createDirAndPersistApp(this.fs, checkAppNotExistOnHdfs, service);
        try {
            ApplicationId submitApp = submitApp(service);
            this.cachedAppInfo.put(name, new AppInfo(submitApp, service.getKerberosPrincipal().getPrincipalName()));
            service.setId(submitApp.toString());
            ServiceApiUtil.writeAppDefinition(this.fs, checkAppNotExistOnHdfs, service);
            return submitApp;
        } catch (YarnException e) {
            actionDestroy(name);
            throw e;
        }
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionFlex(String str, Map<String, String> map) throws YarnException, IOException {
        HashMap hashMap = new HashMap(map.size());
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        if (StringUtils.isEmpty(loadService.getId())) {
            throw new YarnException(loadService.getName() + " appId is null, may be not submitted to YARN yet");
        }
        this.cachedAppInfo.put(loadService.getName(), new AppInfo(ApplicationId.fromString(loadService.getId()), loadService.getKerberosPrincipal().getPrincipalName()));
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            ServiceApiUtil.validateNameFormat(key, getConfig());
            Component component = loadService.getComponent(key);
            if (component == null) {
                throw new IllegalArgumentException(entry.getKey() + " does not exist !");
            }
            hashMap.put(key, Long.valueOf(parseNumberOfContainers(component, entry.getValue())));
        }
        flexComponents(str, hashMap, loadService);
        return 0;
    }

    private long parseNumberOfContainers(Component component, String str) {
        long longValue = component.getNumberOfContainers().longValue();
        if (str.startsWith(Marker.ANY_NON_NULL_MARKER)) {
            return longValue + Long.parseLong(str.substring(1));
        }
        if (!str.startsWith(HelpFormatter.DEFAULT_OPT_PREFIX)) {
            return Long.parseLong(str);
        }
        long parseLong = longValue - Long.parseLong(str.substring(1));
        if (parseLong < 0) {
            LOG.warn(MessageFormat.format("[COMPONENT {0}]: component count goes to negative ({1}{2} = {3}), ignore and reset it to 0.", component.getName(), Long.valueOf(longValue), str, Long.valueOf(parseLong)));
            parseLong = 0;
        }
        return parseLong;
    }

    public Map<String, Long> flexByRestService(String str, Map<String, Long> map) throws YarnException, IOException {
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        if (StringUtils.isEmpty(loadService.getId())) {
            throw new YarnException(str + " appId is null, may be not submitted to YARN yet");
        }
        this.cachedAppInfo.put(loadService.getName(), new AppInfo(ApplicationId.fromString(loadService.getId()), loadService.getKerberosPrincipal().getPrincipalName()));
        return flexComponents(str, map, loadService);
    }

    private Map<String, Long> flexComponents(String str, Map<String, Long> map, Service service) throws YarnException, IOException {
        ServiceApiUtil.validateNameFormat(str, getConfig());
        HashMap hashMap = new HashMap(map.size());
        ClientAMProtocol.ComponentCountProto.Builder newBuilder = ClientAMProtocol.ComponentCountProto.newBuilder();
        ClientAMProtocol.FlexComponentsRequestProto.Builder newBuilder2 = ClientAMProtocol.FlexComponentsRequestProto.newBuilder();
        for (Component component : service.getComponents()) {
            String name = component.getName();
            if (map.containsKey(component.getName())) {
                hashMap.put(name, component.getNumberOfContainers());
                component.setNumberOfContainers(map.get(name));
                newBuilder.setName(component.getName()).setNumberOfContainers(component.getNumberOfContainers().longValue());
                newBuilder2.addComponents(newBuilder.build());
            }
        }
        if (hashMap.size() < map.size()) {
            map.keySet().removeAll(hashMap.keySet());
            throw new YarnException("Components " + map.keySet() + " do not exist in app definition.");
        }
        ServiceApiUtil.writeAppDefinition(this.fs, service);
        ApplicationId appId = getAppId(str);
        if (appId == null) {
            String str2 = "Application ID doesn't exist for " + str;
            LOG.error(str2);
            throw new YarnException(str2);
        }
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(appId);
        if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
            String str3 = str + " is at " + applicationReport.getYarnApplicationState() + " state, flex can only be invoked when service is running";
            LOG.error(str3);
            throw new YarnException(str3);
        }
        Service status = getStatus(str);
        if (status.getState().equals(ServiceState.UPGRADING) || status.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
            String str4 = str + " is at " + status.getState() + " state, flex can not be invoked when service is upgrading. ";
            LOG.error(str4);
            throw new YarnException(str4);
        }
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(str + " AM hostname is empty");
        }
        createAMProxy(str, applicationReport).flexComponents(newBuilder2.build());
        for (Map.Entry entry : hashMap.entrySet()) {
            LOG.info("[COMPONENT {}]: number of containers changed from {} to {}", entry.getKey(), entry.getValue(), map.get(entry.getKey()));
        }
        return hashMap;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionStop(String str) throws YarnException, IOException {
        return actionStop(str, true);
    }

    public int actionStop(String str, boolean z) throws YarnException, IOException {
        org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy;
        ServiceApiUtil.validateNameFormat(str, getConfig());
        ApplicationId appId = getAppId(str);
        if (appId == null) {
            LOG.info("Application ID doesn't exist for service {}", str);
            cleanUpRegistry(str);
            return 40;
        }
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(appId);
        if (terminatedStates.contains(applicationReport.getYarnApplicationState())) {
            LOG.info("Service {} is already in a terminated state {}", str, applicationReport.getYarnApplicationState());
            cleanUpRegistry(str);
            return 40;
        }
        if (preRunningStates.contains(applicationReport.getYarnApplicationState())) {
            String str2 = str + " is at " + applicationReport.getYarnApplicationState() + ", forcefully killed by user!";
            this.yarnClient.killApplication(appId, str2);
            LOG.info(str2);
            cleanUpRegistry(str);
            return 0;
        }
        if (StringUtils.isEmpty(applicationReport.getHost())) {
            throw new YarnException(str + " AM hostname is empty");
        }
        LOG.info("Stopping service {}, with appId = {}", str, appId);
        try {
            createAMProxy = createAMProxy(str, applicationReport);
            this.cachedAppInfo.remove(str);
        } catch (IOException | InterruptedException | YarnException e) {
            LOG.info("Failed to stop " + str + " gracefully due to: " + e.getMessage() + ", forcefully kill the app.");
            this.yarnClient.killApplication(appId, "Forcefully kill the app");
        }
        if (createAMProxy == null) {
            this.yarnClient.killApplication(appId, str + " is forcefully killed by user!");
            LOG.info("Forcefully kill the service: " + str);
            cleanUpRegistry(str);
            return 0;
        }
        createAMProxy.stop(ClientAMProtocol.StopRequestProto.newBuilder().build());
        LOG.info("Service " + str + " is being gracefully stopped...");
        if (!z) {
            cleanUpRegistry(str);
            return 0;
        }
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (true) {
            Thread.sleep(2000L);
            if (terminatedStates.contains(this.yarnClient.getApplicationReport(appId).getYarnApplicationState())) {
                LOG.info("Service " + str + " is stopped.");
                break;
            }
            if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                LOG.info("Stop operation timeout stopping, forcefully kill the app " + str);
                this.yarnClient.killApplication(appId, "Forcefully kill the app by user");
                break;
            }
            i++;
            if (i % 10 == 0) {
                LOG.info("Waiting for service " + str + " to be stopped.");
            }
        }
        cleanUpRegistry(str);
        return 0;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionDestroy(String str) throws YarnException, IOException {
        ServiceApiUtil.validateNameFormat(str, getConfig());
        verifyNoLiveAppInRM(str, ApplicationCLI.DESTROY_CMD);
        Path buildClusterDirPath = this.fs.buildClusterDirPath(str);
        FileSystem fileSystem = this.fs.getFileSystem();
        this.cachedAppInfo.remove(str);
        int i = 0;
        if (!fileSystem.exists(buildClusterDirPath)) {
            LOG.info("Service '" + str + "' doesn't exist at hdfs path: " + buildClusterDirPath);
            i = 44;
        } else {
            if (!fileSystem.delete(buildClusterDirPath, true)) {
                String str2 = "Failed to delete service + " + str + " at:  " + buildClusterDirPath;
                LOG.info(str2);
                throw new YarnException(str2);
            }
            LOG.info("Successfully deleted service dir for " + str + ": " + buildClusterDirPath);
        }
        Path path = new Path(this.fs.getBasePath(), str);
        if (fileSystem.exists(path)) {
            if (!fileSystem.delete(path, true)) {
                String str3 = "Failed to delete public resource dir for service " + str + " at:  " + path;
                LOG.info(str3);
                throw new YarnException(str3);
            }
            LOG.info("Successfully deleted public resource dir for " + str + ": " + path);
        }
        try {
            deleteZKNode(str);
            if (!cleanUpRegistry(str) && i == 0) {
                i = 5;
            }
            if (i == 0) {
                LOG.info("Successfully destroyed service {}", str);
                return i;
            }
            if (i == 44) {
                LOG.error("Error on destroy '" + str + "': not found.");
                return i;
            }
            LOG.error("Error on destroy '" + str + "': error cleaning up registry.");
            return i;
        } catch (Exception e) {
            throw new IOException("Could not delete zk node for " + str, e);
        }
    }

    private boolean cleanUpRegistry(String str, String str2) throws SliderException {
        return cleanUpRegistryPath(RegistryUtils.servicePath(RegistryUtils.registryUser(str2), "yarn-service", str), str);
    }

    private boolean cleanUpRegistry(String str) throws SliderException {
        return cleanUpRegistryPath(ServiceRegistryUtils.registryPathForInstance(str), str);
    }

    private boolean cleanUpRegistryPath(String str, String str2) throws SliderException {
        try {
            if (getRegistryClient().exists(str)) {
                getRegistryClient().delete(str, true);
            } else {
                LOG.info("Service '" + str2 + "' doesn't exist at ZK registry path: " + str);
            }
            return true;
        } catch (IOException e) {
            LOG.warn("Error deleting registry entry {}", str, e);
            return false;
        }
    }

    private synchronized RegistryOperations getRegistryClient() throws SliderException, IOException {
        if (this.registryClient == null) {
            this.registryClient = RegistryOperationsFactory.createInstance("ServiceClient", getConfig());
            this.registryClient.init(getConfig());
            this.registryClient.start();
        }
        return this.registryClient;
    }

    private boolean deleteZKNode(String str) throws Exception {
        CuratorFramework curatorClient = getCuratorClient();
        String mkServiceHomePath = ServiceRegistryUtils.mkServiceHomePath(RegistryUtils.currentUser(), str);
        if (curatorClient.checkExists().forPath(mkServiceHomePath) == null) {
            LOG.info("Service '" + str + "' doesn't exist at ZK path: " + mkServiceHomePath);
            return false;
        }
        curatorClient.delete().deletingChildrenIfNeeded().forPath(mkServiceHomePath);
        LOG.info("Deleted zookeeper path: " + mkServiceHomePath);
        return true;
    }

    private synchronized CuratorFramework getCuratorClient() throws BadConfigException {
        String str = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
        if (ServiceUtils.isUnset(str)) {
            throw new BadConfigException("No Zookeeper quorum provided in the configuration property hadoop.registry.zk.quorum");
        }
        ZookeeperUtils.splitToHostsAndPortsStrictly(str);
        if (this.curatorClient == null) {
            this.curatorClient = CuratorFrameworkFactory.builder().connectString(str).sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(5, 2000)).build();
            this.curatorClient.start();
        }
        return this.curatorClient;
    }

    private void verifyNoLiveAppInRM(String str, String str2) throws IOException, YarnException {
        HashSet hashSet = new HashSet(1);
        hashSet.add("yarn-service");
        Set<String> set = null;
        if (str != null) {
            set = Collections.singleton(ServiceUtils.createNameTag(str));
        }
        GetApplicationsRequest newInstance = GetApplicationsRequest.newInstance();
        newInstance.setApplicationTypes(hashSet);
        newInstance.setApplicationTags(set);
        newInstance.setApplicationStates(liveStates);
        String userName = UserGroupInformation.getCurrentUser().getUserName();
        if (userName != null) {
            newInstance.setUsers(Collections.singleton(userName));
        }
        if (this.yarnClient.getApplications(newInstance).isEmpty()) {
        } else {
            throw new YarnException(str2.equals(ApplicationCLI.DESTROY_CMD) ? "Failed to destroy service " + str + ", because it is still running." : "Failed to " + str2 + " service " + str + ", because it already exists.");
        }
    }

    @VisibleForTesting
    ApplicationId submitApp(Service service) throws IOException, YarnException {
        String name = service.getName();
        Configuration config = getConfig();
        Path buildClusterDirPath = this.fs.buildClusterDirPath(service.getName());
        YarnClientApplication createApplication = this.yarnClient.createApplication();
        ApplicationSubmissionContext applicationSubmissionContext = createApplication.getApplicationSubmissionContext();
        ServiceApiUtil.validateCompResourceSize(createApplication.getNewApplicationResponse().getMaximumResourceCapability(), service);
        applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
        if (service.getLifetime().longValue() > 0) {
            HashMap hashMap = new HashMap();
            hashMap.put(ApplicationTimeoutType.LIFETIME, service.getLifetime());
            applicationSubmissionContext.setApplicationTimeouts(hashMap);
        }
        applicationSubmissionContext.setMaxAppAttempts(YarnServiceConf.getInt(YarnServiceConf.AM_RESTART_MAX, 20, service.getConfiguration(), config));
        applicationSubmissionContext.setAttemptFailuresValidityInterval(YarnServiceConf.getLong(YarnServiceConf.AM_FAILURES_VALIDITY_INTERVAL, -1L, service.getConfiguration(), config));
        setLogAggregationContext(service, config, applicationSubmissionContext);
        HashMap hashMap2 = new HashMap();
        boolean addAMLog4jResource = addAMLog4jResource(name, config, hashMap2);
        addJarResource(name, hashMap2);
        addKeytabResourceIfSecure(this.fs, hashMap2, service);
        addYarnSysFs(buildClusterDirPath, hashMap2, service);
        if (LOG.isDebugEnabled()) {
            printLocalResources(hashMap2);
        }
        Map<String, String> addAMEnv = addAMEnv();
        String buildCommandLine = buildCommandLine(service, config, buildClusterDirPath, addAMLog4jResource);
        applicationSubmissionContext.setResource(Resource.newInstance(YarnServiceConf.getLong(YarnServiceConf.AM_RESOURCE_MEM, 1024L, service.getConfiguration(), config), 1));
        String queue = service.getQueue();
        if (StringUtils.isEmpty(queue)) {
            queue = config.get(YarnServiceConf.YARN_QUEUE, "default");
        }
        applicationSubmissionContext.setQueue(queue);
        applicationSubmissionContext.setApplicationName(name);
        applicationSubmissionContext.setApplicationType("yarn-service");
        Set<String> createApplicationTags = AbstractClientProvider.createApplicationTags(name, null, null);
        if (!createApplicationTags.isEmpty()) {
            applicationSubmissionContext.setApplicationTags(createApplicationTags);
        }
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        containerLaunchContext.setCommands(Collections.singletonList(buildCommandLine));
        containerLaunchContext.setEnvironment(addAMEnv);
        containerLaunchContext.setLocalResources(hashMap2);
        addCredentials(containerLaunchContext, service);
        applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
        this.yarnClient.submitApplication(applicationSubmissionContext);
        return applicationSubmissionContext.getApplicationId();
    }

    public static File compressFiles(Collection<File> collection, File file, String str) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        try {
            TarArchiveOutputStream tarArchiveOutputStream = new TarArchiveOutputStream(new BufferedOutputStream(fileOutputStream));
            try {
                tarArchiveOutputStream.setLongFileMode(2);
                Iterator<File> it = collection.iterator();
                while (it.hasNext()) {
                    addFilesToCompression(tarArchiveOutputStream, it.next(), "sysfs", str);
                }
                tarArchiveOutputStream.close();
                fileOutputStream.close();
                return file;
            } finally {
            }
        } catch (Throwable th) {
            try {
                fileOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static void addFilesToCompression(TarArchiveOutputStream tarArchiveOutputStream, File file, String str, String str2) throws IOException {
        if (file.isHidden()) {
            return;
        }
        if (!str.equals(".") && File.separator.equals("\\")) {
            str = str.replaceAll("\\\\", "/");
        }
        tarArchiveOutputStream.putArchiveEntry(new TarArchiveEntry(file, str + "/" + file.getName()));
        if (file.isFile()) {
            FileInputStream fileInputStream = new FileInputStream(file);
            try {
                IOUtils.copy(fileInputStream, tarArchiveOutputStream);
                tarArchiveOutputStream.closeArchiveEntry();
                fileInputStream.close();
                return;
            } catch (Throwable th) {
                try {
                    fileInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        if (file.isDirectory()) {
            if (!str.equals(".")) {
                tarArchiveOutputStream.closeArchiveEntry();
            }
            File[] listFiles = file.listFiles();
            if (listFiles != null) {
                for (File file2 : listFiles) {
                    addFilesToCompression(tarArchiveOutputStream, file2, file.getPath().substring(str2.length()), str2);
                }
            }
        }
    }

    private void addYarnSysFs(Path path, Map<String, LocalResource> map, Service service) throws IOException {
        ArrayList<Component> arrayList = new ArrayList();
        for (Component component : service.getComponents()) {
            if (Boolean.parseBoolean(component.getConfiguration().getEnv(ApplicationConstants.Environment.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()))) {
                arrayList.add(component);
            }
        }
        if (arrayList.size() == 0) {
            return;
        }
        String json = ServiceApiUtil.jsonSerDeser.toJson(service);
        File file = Files.createTempDirectory(new File(System.getProperty("java.io.tmpdir")).toPath(), System.currentTimeMillis() + HelpFormatter.DEFAULT_OPT_PREFIX, new FileAttribute[0]).toFile();
        if (!file.exists()) {
            throw new IOException("Fail to localize sysfs resource.");
        }
        File file2 = new File(file.getAbsolutePath() + "/app.json");
        if (!file2.createNewFile()) {
            throw new IOException("Fail to write app.json to temp directory");
        }
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(file2), StandardCharsets.UTF_8);
        try {
            outputStreamWriter.write(json);
            outputStreamWriter.close();
            File file3 = new File(file.getAbsolutePath() + "/sysfs.tar");
            if (!file3.createNewFile()) {
                throw new IOException("Fail to localize sysfs.tar.");
            }
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(file2);
            compressFiles(arrayList2, file3, "sysfs");
            LocalResource submitFile = this.fs.submitFile(file3, path, ".", "sysfs.tar");
            Path path2 = new Path(path, "sysfs.tar");
            for (Component component2 : arrayList) {
                ConfigFile configFile = new ConfigFile();
                configFile.type(ConfigFile.TypeEnum.ARCHIVE);
                configFile.srcFile(path2.toString());
                configFile.destFile("/hadoop/yarn");
                if (!component2.getConfiguration().getFiles().contains(configFile)) {
                    component2.getConfiguration().getFiles().add(configFile);
                }
            }
            map.put("sysfs", submitFile);
            if (file.delete()) {
                return;
            }
            LOG.warn("Failed to delete temp file: " + file.getAbsolutePath());
        } catch (Throwable th) {
            try {
                outputStreamWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void setLogAggregationContext(Service service, Configuration configuration, ApplicationSubmissionContext applicationSubmissionContext) {
        LogAggregationContext logAggregationContext = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        String str = YarnServiceConf.get(YarnServiceConf.FINAL_LOG_INCLUSION_PATTERN, null, service.getConfiguration(), configuration);
        if (!StringUtils.isEmpty(str)) {
            logAggregationContext.setIncludePattern(str);
        }
        String str2 = YarnServiceConf.get(YarnServiceConf.FINAL_LOG_EXCLUSION_PATTERN, null, service.getConfiguration(), configuration);
        if (!StringUtils.isEmpty(str2)) {
            logAggregationContext.setExcludePattern(str2);
        }
        String str3 = YarnServiceConf.get(YarnServiceConf.ROLLING_LOG_INCLUSION_PATTERN, null, service.getConfiguration(), configuration);
        if (!StringUtils.isEmpty(str3)) {
            logAggregationContext.setRolledLogsIncludePattern(str3);
        }
        String str4 = YarnServiceConf.get(YarnServiceConf.ROLLING_LOG_EXCLUSION_PATTERN, null, service.getConfiguration(), configuration);
        if (!StringUtils.isEmpty(str4)) {
            logAggregationContext.setRolledLogsExcludePattern(str4);
        }
        applicationSubmissionContext.setLogAggregationContext(logAggregationContext);
    }

    private void printLocalResources(Map<String, LocalResource> map) {
        LOG.debug("Added LocalResource for localization: ");
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, LocalResource> entry : map.entrySet()) {
            sb.append(entry.getKey()).append(" -> ").append(entry.getValue().getResource().getFile()).append(System.lineSeparator());
        }
        LOG.debug("{}", sb);
    }

    private String buildCommandLine(Service service, Configuration configuration, Path path, boolean z) throws BadConfigException {
        JavaCommandLineBuilder javaCommandLineBuilder = new JavaCommandLineBuilder();
        javaCommandLineBuilder.forceIPv4().headless();
        String str = YarnServiceConf.get(YarnServiceConf.JVM_OPTS, "", service.getConfiguration(), configuration);
        if (!str.contains("-Xmx")) {
            str = str + YarnServiceConf.DEFAULT_AM_JVM_XMX;
        }
        ServiceApiUtil.validateJvmOpts(str);
        javaCommandLineBuilder.setJVMOpts(str);
        if (z) {
            javaCommandLineBuilder.sysprop("log4j.configuration", YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
            javaCommandLineBuilder.sysprop(YarnServiceConstants.SYSPROP_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
        }
        javaCommandLineBuilder.add(ServiceMaster.class.getCanonicalName());
        javaCommandLineBuilder.add("-yarnfile", new Path(path, service.getName() + ".json"));
        javaCommandLineBuilder.add("-service_name", service.getName());
        if (service.getKerberosPrincipal() != null) {
            if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
                javaCommandLineBuilder.add("-keytab", service.getKerberosPrincipal().getKeytab());
            }
            if (!StringUtils.isEmpty(service.getKerberosPrincipal().getPrincipalName())) {
                javaCommandLineBuilder.add("-principal_name", service.getKerberosPrincipal().getPrincipalName());
            }
        }
        javaCommandLineBuilder.addConfOptionToCLI(configuration, RegistryConstants.KEY_REGISTRY_ZK_ROOT, RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
        javaCommandLineBuilder.addMandatoryConfOption(configuration, RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
        javaCommandLineBuilder.addOutAndErrFiles(YarnServiceConstants.STDOUT_AM, YarnServiceConstants.STDERR_AM);
        String build = javaCommandLineBuilder.build();
        LOG.debug("AM launch command: {}", build);
        return build;
    }

    @VisibleForTesting
    protected Map<String, String> addAMEnv() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(AbstractLauncher.CLASSPATH, ServiceUtils.buildClasspath("conf", YarnServiceConstants.APP_LIB_DIR, this.fs, getConfig().get(YarnServiceConf.YARN_SERVICE_CLASSPATH, ""), getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)).buildClasspath());
        hashMap.put("LANG", "en_US.UTF-8");
        hashMap.put("LC_ALL", "en_US.UTF-8");
        hashMap.put("LANGUAGE", "en_US.UTF-8");
        String str = System.getenv(KDiag.HADOOP_JAAS_DEBUG);
        if (str != null) {
            hashMap.put(KDiag.HADOOP_JAAS_DEBUG, str);
        }
        if (!UserGroupInformation.isSecurityEnabled()) {
            String userName = UserGroupInformation.getCurrentUser().getUserName();
            LOG.debug("Run as user {}", userName);
            hashMap.put("HADOOP_USER_NAME", userName);
        }
        LOG.debug("AM env: \n{}", ServiceUtils.stringifyMap(hashMap));
        return hashMap;
    }

    protected Path addJarResource(String str, Map<String, LocalResource> map) throws IOException, YarnException {
        Path buildClusterDirPath = this.fs.buildClusterDirPath(str);
        ProviderUtils.addProviderJar(map, ServiceMaster.class, YarnServiceConstants.SERVICE_CORE_JAR, this.fs, buildClusterDirPath, YarnServiceConstants.APP_LIB_DIR, false);
        Path dependencyTarGzip = this.fs.getDependencyTarGzip();
        if (actionDependency(null, false) == 0) {
            LOG.info("Loading lib tar from " + dependencyTarGzip);
            this.fs.submitTarGzipAndUpdate(map);
        } else {
            if (dependencyTarGzip != null) {
                LOG.warn("Property {} has a value {}, but is not a valid file", YarnServiceConf.DEPENDENCY_TARBALL_PATH, dependencyTarGzip);
            }
            String[] libDirs = ServiceUtils.getLibDirs();
            LOG.info("Uploading all dependency jars to HDFS. For faster submission of apps, set config property {} to the dependency tarball location. Dependency tarball can be uploaded to any HDFS path directly or by using command: yarn app -{} [<Destination Folder>]", YarnServiceConf.DEPENDENCY_TARBALL_PATH, ApplicationCLI.ENABLE_FAST_LAUNCH);
            for (String str2 : libDirs) {
                ProviderUtils.addAllDependencyJars(map, this.fs, buildClusterDirPath, YarnServiceConstants.APP_LIB_DIR, str2);
            }
        }
        return buildClusterDirPath;
    }

    private boolean addAMLog4jResource(String str, Configuration configuration, Map<String, LocalResource> map) throws IOException, BadClusterStateException {
        boolean z = false;
        String str2 = System.getenv(ApplicationConstants.Environment.HADOOP_CONF_DIR.name());
        if (str2 != null) {
            File file = new File(str2, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME);
            if (file.exists()) {
                Path createLocalPath = ServiceUtils.createLocalPath(file);
                Path path = new Path(this.fs.buildClusterDirPath(str), "conf");
                ServiceUtils.copy(configuration, createLocalPath, new Path(path, YarnServiceConstants.YARN_SERVICE_LOG4J_FILENAME));
                map.put(createLocalPath.getName(), this.fs.createAmResource(path, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
                z = true;
            } else {
                LOG.warn("AM log4j property file doesn't exist: " + file);
            }
        }
        return z;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int actionStart(String str) throws YarnException, IOException {
        actionStartAndGetId(str);
        return 0;
    }

    public ApplicationId actionStartAndGetId(String str) throws YarnException, IOException {
        ServiceApiUtil.validateNameFormat(str, getConfig());
        Service status = getStatus(str);
        if (status != null && status.getState().equals(ServiceState.UPGRADING)) {
            LOG.info("Finalize service {} upgrade", str);
            ApplicationId appId = getAppId(str);
            ApplicationReport applicationReport = this.yarnClient.getApplicationReport(appId);
            if (StringUtils.isEmpty(applicationReport.getHost())) {
                throw new YarnException(str + " AM hostname is empty");
            }
            createAMProxy(str, applicationReport).restart(ClientAMProtocol.RestartServiceRequestProto.newBuilder().build());
            return appId;
        }
        Path checkAppExistOnHdfs = checkAppExistOnHdfs(str);
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        ServiceApiUtil.validateAndResolveService(loadService, this.fs, getConfig());
        verifyNoLiveAppInRM(str, "start");
        try {
            ApplicationId submitApp = submitApp(loadService);
            this.cachedAppInfo.put(str, new AppInfo(submitApp, loadService.getKerberosPrincipal().getPrincipalName()));
            loadService.setId(submitApp.toString());
            LOG.info("Persisted service " + loadService.getName() + " at " + ServiceApiUtil.writeAppDefinition(this.fs, checkAppExistOnHdfs, loadService));
            return submitApp;
        } catch (YarnException e) {
            actionDestroy(str);
            throw e;
        }
    }

    private Path checkAppNotExistOnHdfs(Service service, boolean z) throws IOException, SliderException {
        Path buildClusterDirPath = !z ? this.fs.buildClusterDirPath(service.getName()) : this.fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion());
        this.fs.verifyDirectoryNonexistent(new Path(buildClusterDirPath, service.getName() + ".json"));
        return buildClusterDirPath;
    }

    private Path checkAppExistOnHdfs(String str) throws IOException, SliderException {
        Path buildClusterDirPath = this.fs.buildClusterDirPath(str);
        this.fs.verifyPathExists(new Path(buildClusterDirPath, str + ".json"));
        return buildClusterDirPath;
    }

    private void addCredentials(ContainerLaunchContext containerLaunchContext, Service service) throws IOException {
        Credentials credentials = new Credentials();
        if (UserGroupInformation.isSecurityEnabled()) {
            String rmPrincipal = YarnClientUtils.getRmPrincipal(getConfig());
            if (StringUtils.isEmpty(rmPrincipal)) {
                throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
            }
            Token<?>[] addDelegationTokens = this.fs.getFileSystem().addDelegationTokens(rmPrincipal, credentials);
            if (LOG.isDebugEnabled() && addDelegationTokens != null && addDelegationTokens.length != 0) {
                for (Token<?> token : addDelegationTokens) {
                    LOG.debug("Got DT: {}", token);
                }
            }
        }
        if (!StringUtils.isEmpty(service.getDockerClientConfig())) {
            credentials.addAll(DockerClientConfigHandler.readCredentialsFromConfigFile(new Path(service.getDockerClientConfig()), getConfig(), service.getName()));
        }
        if (credentials.numberOfTokens() > 0) {
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            credentials.writeTokenStorageToStream(dataOutputBuffer);
            containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
        }
    }

    private void addKeytabResourceIfSecure(SliderFileSystem sliderFileSystem, Map<String, LocalResource> map, Service service) throws IOException, YarnException {
        if (UserGroupInformation.isSecurityEnabled()) {
            String principalName = service.getKerberosPrincipal().getPrincipalName();
            if (StringUtils.isEmpty(principalName)) {
                LOG.warn("No Kerberos principal name specified for " + service.getName());
                return;
            }
            if (StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
                LOG.warn("No Kerberos keytab specified for " + service.getName());
                return;
            }
            try {
                URI uri = new URI(service.getKerberosPrincipal().getKeytab());
                if ("file".equals(uri.getScheme())) {
                    LOG.info("Using a keytab from localhost: " + uri);
                    return;
                }
                Path path = new Path(uri);
                if (!sliderFileSystem.getFileSystem().exists(path)) {
                    LOG.warn(service.getName() + "'s keytab (principalName = " + principalName + ") doesn't exist at: " + path);
                } else {
                    map.put(String.format(YarnServiceConstants.KEYTAB_LOCATION, service.getName()), sliderFileSystem.createAmResource(path, LocalResourceType.FILE, LocalResourceVisibility.PRIVATE));
                    LOG.info("Adding " + service.getName() + "'s keytab for localization, uri = " + path);
                }
            } catch (URISyntaxException e) {
                throw new YarnException(e);
            }
        }
    }

    public String updateLifetime(String str, long j) throws YarnException, IOException {
        ApplicationId appId = getAppId(str);
        if (appId == null) {
            throw new YarnException("Application ID not found for " + str);
        }
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(appId);
        if (applicationReport == null) {
            throw new YarnException("Service not found for " + str);
        }
        ApplicationId applicationId = applicationReport.getApplicationId();
        LOG.info("Updating lifetime of an service: serviceName = " + str + ", appId = " + applicationId + ", lifetime = " + j);
        HashMap hashMap = new HashMap();
        String formatISO8601 = Times.formatISO8601(System.currentTimeMillis() + (j * 1000));
        hashMap.put(ApplicationTimeoutType.LIFETIME, formatISO8601);
        this.yarnClient.updateApplicationTimeouts(UpdateApplicationTimeoutsRequest.newInstance(applicationId, hashMap));
        LOG.info("Successfully updated lifetime for an service: serviceName = " + str + ", appId = " + applicationId + ". New expiry time in ISO8601 format is " + formatISO8601);
        return formatISO8601;
    }

    public ServiceState convertState(YarnApplicationState yarnApplicationState) {
        switch (yarnApplicationState) {
            case NEW:
            case NEW_SAVING:
            case SUBMITTED:
            case ACCEPTED:
                return ServiceState.ACCEPTED;
            case RUNNING:
                return ServiceState.STARTED;
            case FINISHED:
            case KILLED:
                return ServiceState.STOPPED;
            case FAILED:
                return ServiceState.FAILED;
            default:
                return ServiceState.ACCEPTED;
        }
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public String getStatusString(String str) throws IOException, YarnException {
        try {
            return getStatusByAppId(ApplicationId.fromString(str));
        } catch (IllegalArgumentException e) {
            return ServiceApiUtil.jsonSerDeser.toJson(getStatus(str));
        }
    }

    private String getStatusByAppId(ApplicationId applicationId) throws IOException, YarnException {
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(applicationId);
        return (applicationReport.getYarnApplicationState() == YarnApplicationState.RUNNING && !StringUtils.isEmpty(applicationReport.getHost())) ? createAMProxy(applicationReport.getName(), applicationReport).getStatus(ClientAMProtocol.GetStatusRequestProto.newBuilder().build()).getStatus() : "";
    }

    public Service getStatus(String str) throws IOException, YarnException {
        ServiceApiUtil.validateNameFormat(str, getConfig());
        Service service = new Service();
        service.setName(str);
        service.setState(ServiceState.STOPPED);
        ApplicationId appId = getAppId(str);
        if (appId == null) {
            LOG.info("Service {} does not have an application ID", str);
            return service;
        }
        service.setId(appId.toString());
        try {
            ApplicationReport applicationReport = this.yarnClient.getApplicationReport(appId);
            if (applicationReport == null) {
                LOG.warn("application ID {} is reported as null", appId);
                return service;
            }
            service.setState(convertState(applicationReport.getYarnApplicationState()));
            ApplicationTimeout applicationTimeout = applicationReport.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
            if (applicationTimeout != null) {
                service.setLifetime(Long.valueOf(applicationTimeout.getRemainingTime()));
            }
            if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
                LOG.info("Service {} is at {} state", str, applicationReport.getYarnApplicationState());
                return service;
            }
            if (StringUtils.isEmpty(applicationReport.getHost())) {
                LOG.warn(str + " AM hostname is empty");
                return service;
            }
            Service fromJson = ServiceApiUtil.jsonSerDeser.fromJson(createAMProxy(str, applicationReport).getStatus(ClientAMProtocol.GetStatusRequestProto.newBuilder().build()).getStatus());
            if (applicationTimeout != null) {
                fromJson.setLifetime(Long.valueOf(applicationTimeout.getRemainingTime()));
            }
            return fromJson;
        } catch (ApplicationNotFoundException e) {
            LOG.info("application ID {} doesn't exist", appId);
            return service;
        }
    }

    public YarnClient getYarnClient() {
        return this.yarnClient;
    }

    @Override // org.apache.hadoop.yarn.client.api.AppAdminClient
    public int enableFastLaunch(String str) throws IOException, YarnException {
        return actionDependency(str, true);
    }

    public int actionDependency(String str, boolean z) {
        LOG.info("Running command as user {}", RegistryUtils.currentUser());
        Path dependencyTarGzip = str == null ? this.fs.getDependencyTarGzip() : new Path(str, "service-dep.tar.gz");
        if (this.fs.isFile(dependencyTarGzip) && !z) {
            System.out.println(String.format("Dependency libs are already uploaded to %s.", dependencyTarGzip.toUri()));
            return 0;
        }
        String[] libDirs = ServiceUtils.getLibDirs();
        if (libDirs.length <= 0) {
            return -1;
        }
        File file = null;
        try {
            try {
                if (!checkPermissions(dependencyTarGzip)) {
                    if (0 != 0 && !file.delete()) {
                        LOG.warn("Failed to delete tmp file {}", (Object) null);
                    }
                    return 41;
                }
                file = File.createTempFile("service-dep_", YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
                ServiceUtils.tarGzipFolder(libDirs, file, ServiceUtils.createJarFilter());
                this.fs.copyLocalFileToHdfs(file, dependencyTarGzip, new FsPermission(YarnServiceConstants.DEPENDENCY_DIR_PERMISSIONS));
                LOG.info("To let apps use this tarball, in yarn-site set config property {} to {}", YarnServiceConf.DEPENDENCY_TARBALL_PATH, dependencyTarGzip);
                if (file != null && !file.delete()) {
                    LOG.warn("Failed to delete tmp file {}", file);
                }
                return 0;
            } catch (IOException e) {
                LOG.error("Got exception creating tarball and uploading to HDFS", (Throwable) e);
                if (file != null && !file.delete()) {
                    LOG.warn("Failed to delete tmp file {}", file);
                }
                return 56;
            }
        } catch (Throwable th) {
            if (file != null && !file.delete()) {
                LOG.warn("Failed to delete tmp file {}", file);
            }
            throw th;
        }
    }

    private boolean checkPermissions(Path path) throws IOException {
        AccessControlList accessControlList = new AccessControlList(getConfig().get(YarnConfiguration.YARN_ADMIN_ACL, "*"));
        AccessControlList accessControlList2 = new AccessControlList(getConfig().get(DFSConfigKeys.DFS_ADMIN, " "));
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        if (!accessControlList.isUserAllowed(currentUser) && !accessControlList2.isUserAllowed(currentUser)) {
            LOG.error("User must be on the {} or {} list to have permission to upload AM dependency tarball", YarnConfiguration.YARN_ADMIN_ACL, DFSConfigKeys.DFS_ADMIN);
            return false;
        }
        Path parent = path.getParent();
        while (true) {
            Path path2 = parent;
            if (path2 == null) {
                return true;
            }
            if (this.fs.getFileSystem().exists(path2) && !this.fs.getFileSystem().getFileStatus(path2).getPermission().getOtherAction().implies(FsAction.READ_EXECUTE)) {
                LOG.error("Parent directory {} of {} tarball location {} does not have world read/execute permission", path2, YarnServiceConf.DEPENDENCY_TARBALL_PATH, path);
                return false;
            }
            parent = path2.getParent();
        }
    }

    protected org.apache.hadoop.yarn.service.ClientAMProtocol createAMProxy(String str, ApplicationReport applicationReport) throws IOException, YarnException {
        if (UserGroupInformation.isSecurityEnabled()) {
            if (!this.cachedAppInfo.containsKey(str)) {
                this.cachedAppInfo.put(str, new AppInfo(applicationReport.getApplicationId(), ServiceApiUtil.loadService(this.fs, str).getKerberosPrincipal().getPrincipalName()));
            }
            String str2 = this.cachedAppInfo.get(str).principalName;
            if (StringUtils.isEmpty(str2)) {
                throw new YarnException("No principal specified in the persisted service definition, fail to connect to AM.");
            }
            getConfig().set(YarnServiceConstants.PRINCIPAL, str2);
        }
        return (org.apache.hadoop.yarn.service.ClientAMProtocol) ClientAMProxy.createProxy(getConfig(), org.apache.hadoop.yarn.service.ClientAMProtocol.class, UserGroupInformation.getCurrentUser(), this.rpc, NetUtils.createSocketAddrForHost(applicationReport.getHost(), applicationReport.getRpcPort()));
    }

    @VisibleForTesting
    void setFileSystem(SliderFileSystem sliderFileSystem) throws IOException {
        this.fs = sliderFileSystem;
    }

    @VisibleForTesting
    void setYarnClient(YarnClient yarnClient) {
        this.yarnClient = yarnClient;
    }

    public synchronized ApplicationId getAppId(String str) throws IOException, YarnException {
        if (this.cachedAppInfo.containsKey(str)) {
            return this.cachedAppInfo.get(str).appId;
        }
        Service loadService = ServiceApiUtil.loadService(this.fs, str);
        if (loadService == null) {
            throw new YarnException("Service " + str + " doesn't exist on hdfs. Please check if the app exists in RM");
        }
        if (loadService.getId() == null) {
            return null;
        }
        ApplicationId fromString = ApplicationId.fromString(loadService.getId());
        this.cachedAppInfo.put(str, new AppInfo(fromString, loadService.getKerberosPrincipal().getPrincipalName()));
        return fromString;
    }
}
