/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.core.security.FlinkSecurityManager;
import org.apache.flink.management.jmx.JMXService;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.DeterminismEnvelope;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.TraceReporterSetup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.taskexecutor.HostBindPolicy;
import org.apache.flink.runtime.taskexecutor.SystemOutRedirectionUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToServiceAdapter;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TaskManagerExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManagerRunner
implements FatalErrorHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
    private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
    private static final int SUCCESS_EXIT_CODE = 0;
    @VisibleForTesting
    public static final int FAILURE_EXIT_CODE = 1;
    private final Thread shutdownHook;
    private final Object lock = new Object();
    private final Configuration configuration;
    private final Duration timeout;
    private final PluginManager pluginManager;
    private final TaskExecutorServiceFactory taskExecutorServiceFactory;
    private final CompletableFuture<Result> terminationFuture;
    @GuardedBy(value="lock")
    private DeterminismEnvelope<ResourceID> resourceId;
    @GuardedBy(value="lock")
    private ExecutorService executor;
    @GuardedBy(value="lock")
    private RpcSystem rpcSystem;
    @GuardedBy(value="lock")
    private RpcService rpcService;
    @GuardedBy(value="lock")
    private HighAvailabilityServices highAvailabilityServices;
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry;
    @GuardedBy(value="lock")
    private BlobCacheService blobCacheService;
    @GuardedBy(value="lock")
    private DeterminismEnvelope<WorkingDirectory> workingDirectory;
    @GuardedBy(value="lock")
    private TaskExecutorService taskExecutorService;
    @GuardedBy(value="lock")
    private boolean shutdown;

    public TaskManagerRunner(Configuration configuration, PluginManager pluginManager, TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.pluginManager = (PluginManager)Preconditions.checkNotNull((Object)pluginManager);
        this.taskExecutorServiceFactory = (TaskExecutorServiceFactory)Preconditions.checkNotNull((Object)taskExecutorServiceFactory);
        this.timeout = (Duration)configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
        this.terminationFuture = new CompletableFuture();
        this.shutdown = false;
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(() -> this.closeAsync(Result.JVM_SHUTDOWN).join(), (String)this.getClass().getSimpleName(), (Logger)LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startTaskManagerRunnerServices() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.rpcSystem = RpcSystem.load((Configuration)this.configuration);
            this.executor = Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), (ThreadFactory)new ExecutorThreadFactory("taskmanager-future"));
            this.highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(this.configuration, this.executor, AddressResolution.NO_ADDRESS_RESOLUTION, (RpcSystemUtils)this.rpcSystem, this);
            JMXService.startInstance((String)((String)this.configuration.get(JMXServerOptions.JMX_SERVER_PORT)));
            this.rpcService = TaskManagerRunner.createRpcService(this.configuration, this.highAvailabilityServices, this.rpcSystem);
            this.resourceId = TaskManagerRunner.getTaskManagerResourceID(this.configuration, this.rpcService.getAddress(), this.rpcService.getPort());
            this.workingDirectory = ClusterEntrypointUtils.createTaskManagerWorkingDirectory(this.configuration, this.resourceId);
            LOG.info("Using working directory: {}", this.workingDirectory);
            HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(this.configuration);
            this.metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(this.configuration, this.rpcSystem.getMaximumMessageSizeInBytes(this.configuration)), ReporterSetup.fromConfiguration(this.configuration, this.pluginManager), TraceReporterSetup.fromConfiguration(this.configuration, this.pluginManager));
            RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(this.configuration, this.rpcService.getAddress(), (String)this.configuration.get(TaskManagerOptions.BIND_HOST), this.rpcSystem);
            this.metricRegistry.startQueryService(metricQueryServiceRpcService, this.resourceId.unwrap());
            this.blobCacheService = BlobUtils.createBlobCacheService(this.configuration, (Reference<File>)Reference.borrowed((Object)this.workingDirectory.unwrap().getBlobStorageDirectory()), this.highAvailabilityServices.createBlobStore(), null);
            ExternalResourceInfoProvider externalResourceInfoProvider = ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(this.configuration, this.pluginManager);
            DelegationTokenReceiverRepository delegationTokenReceiverRepository = new DelegationTokenReceiverRepository(this.configuration, this.pluginManager);
            this.taskExecutorService = this.taskExecutorServiceFactory.createTaskExecutor(this.configuration, this.resourceId.unwrap(), this.rpcService, this.highAvailabilityServices, heartbeatServices, this.metricRegistry, this.blobCacheService, false, externalResourceInfoProvider, this.workingDirectory.unwrap(), this, delegationTokenReceiverRepository);
            this.handleUnexpectedTaskExecutorServiceTermination();
            MemoryLogger.startIfConfigured(LOG, this.configuration, (CompletableFuture<Void>)this.terminationFuture.thenAccept(ignored -> {}));
        }
    }

    @GuardedBy(value="lock")
    private void handleUnexpectedTaskExecutorServiceTermination() {
        this.taskExecutorService.getTerminationFuture().whenComplete((unused, throwable) -> {
            Object object = this.lock;
            synchronized (object) {
                if (!this.shutdown) {
                    this.onFatalError(new FlinkException("Unexpected termination of the TaskExecutor.", throwable));
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.startTaskManagerRunnerServices();
            this.taskExecutorService.start();
        }
    }

    public void close() throws Exception {
        try {
            this.closeAsync().get();
        }
        catch (ExecutionException e) {
            ExceptionUtils.rethrowException((Throwable)ExceptionUtils.stripExecutionException((Throwable)e));
        }
    }

    public CompletableFuture<Result> closeAsync() {
        return this.closeAsync(Result.SUCCESS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Result> closeAsync(Result terminationResult) {
        Object object = this.lock;
        synchronized (object) {
            ShutdownHookUtil.removeShutdownHook((Thread)this.shutdownHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
            if (this.shutdown) {
                return this.terminationFuture;
            }
            CompletableFuture taskManagerTerminationFuture = this.taskExecutorService != null ? this.taskExecutorService.closeAsync() : FutureUtils.completedVoidFuture();
            CompletableFuture serviceTerminationFuture = FutureUtils.composeAfterwards((CompletableFuture)taskManagerTerminationFuture, this::shutDownServices);
            CompletableFuture workingDirCleanupFuture = FutureUtils.runAfterwards((CompletableFuture)serviceTerminationFuture, () -> this.deleteWorkingDir(terminationResult));
            CompletableFuture rpcSystemClassLoaderCloseFuture = this.rpcSystem != null ? FutureUtils.runAfterwards((CompletableFuture)workingDirCleanupFuture, () -> ((RpcSystem)this.rpcSystem).close()) : FutureUtils.completedVoidFuture();
            rpcSystemClassLoaderCloseFuture.whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.terminationFuture.completeExceptionally((Throwable)throwable);
                } else {
                    this.terminationFuture.complete(terminationResult);
                }
            });
            this.shutdown = true;
            return this.terminationFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteWorkingDir(Result terminationResult) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (!(this.workingDirectory == null || this.workingDirectory.isDeterministic() && terminationResult != Result.SUCCESS)) {
                this.workingDirectory.unwrap().delete();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> shutDownServices() {
        Object object = this.lock;
        synchronized (object) {
            ArrayList<CompletableFuture> terminationFutures = new ArrayList<CompletableFuture>(3);
            Exception exception = null;
            try {
                JMXService.stopInstance();
            }
            catch (Exception e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
            }
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
            }
            if (this.metricRegistry != null) {
                try {
                    terminationFutures.add(this.metricRegistry.closeAsync());
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
            }
            if (this.highAvailabilityServices != null) {
                try {
                    this.highAvailabilityServices.close();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
            }
            if (this.rpcService != null) {
                terminationFutures.add(this.rpcService.closeAsync());
            }
            if (this.executor != null) {
                terminationFutures.add(ExecutorUtils.nonBlockingShutdown((long)this.timeout.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.executor}));
            }
            if (exception != null) {
                terminationFutures.add(FutureUtils.completedExceptionally((Throwable)exception));
            }
            return FutureUtils.completeAll(terminationFutures);
        }
    }

    public CompletableFuture<Result> getTerminationFuture() {
        return this.terminationFuture;
    }

    public void onFatalError(Throwable exception) {
        TaskManagerExceptionUtils.tryEnrichTaskManagerError((Throwable)exception);
        LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
        if (ExceptionUtils.isJvmFatalOrOutOfMemoryError((Throwable)exception)) {
            this.terminateJVM();
        } else {
            this.closeAsync(Result.FAILURE);
            FutureUtils.orTimeout(this.terminationFuture, (long)10000L, (TimeUnit)TimeUnit.MILLISECONDS, (String)String.format("Waiting for TaskManager shutting down timed out after %s ms.", 10000L));
        }
    }

    private void terminateJVM() {
        FlinkSecurityManager.forceProcessExit((int)1);
    }

    public static void main(String[] args) throws Exception {
        EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
        if (maxOpenFileHandles != -1L) {
            LOG.info("Maximum number of open file descriptors is {}.", (Object)maxOpenFileHandles);
        } else {
            LOG.info("Cannot determine the maximum number of open file descriptors");
        }
        TaskManagerRunner.runTaskManagerProcessSecurely(args);
    }

    public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
        return ConfigurationParserUtils.loadCommonConfiguration(args, TaskManagerRunner.class.getSimpleName());
    }

    public static int runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
        TaskManagerRunner taskManagerRunner;
        try {
            taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
            taskManagerRunner.start();
        }
        catch (Exception exception) {
            throw new FlinkException("Failed to start the TaskManagerRunner.", (Throwable)exception);
        }
        try {
            return taskManagerRunner.getTerminationFuture().get().getExitCode();
        }
        catch (Throwable t) {
            throw new FlinkException("Unexpected failure during runtime of TaskManagerRunner.", ExceptionUtils.stripExecutionException((Throwable)t));
        }
    }

    public static void runTaskManagerProcessSecurely(String[] args) {
        Configuration configuration = null;
        try {
            configuration = TaskManagerRunner.loadConfiguration(args);
        }
        catch (FlinkParseException fpe) {
            LOG.error("Could not load the configuration.", (Throwable)((Object)fpe));
            System.exit(1);
        }
        TaskManagerRunner.runTaskManagerProcessSecurely((Configuration)Preconditions.checkNotNull((Object)configuration));
    }

    public static void runTaskManagerProcessSecurely(Configuration configuration) {
        int exitCode;
        FlinkSecurityManager.setFromConfiguration((Configuration)configuration);
        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder((Configuration)configuration);
        FileSystem.initialize((Configuration)configuration, (PluginManager)pluginManager);
        StateChangelogStorageLoader.initialize(pluginManager);
        Throwable throwable = null;
        ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
        try {
            SecurityUtils.install(new SecurityConfiguration(configuration));
            exitCode = SecurityUtils.getInstalledContext().runSecured(() -> TaskManagerRunner.runTaskManager(configuration, pluginManager));
        }
        catch (Throwable t) {
            throwable = ExceptionUtils.stripException((Throwable)t, UndeclaredThrowableException.class);
            exitCode = 1;
        }
        if (throwable != null) {
            LOG.error("Terminating TaskManagerRunner with exit code {}.", (Object)exitCode, (Object)throwable);
        } else {
            LOG.info("Terminating TaskManagerRunner with exit code {}.", (Object)exitCode);
        }
        System.exit(exitCode);
    }

    public static TaskExecutorService createTaskExecutorService(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, FatalErrorHandler fatalErrorHandler, DelegationTokenReceiverRepository delegationTokenReceiverRepository) throws Exception {
        TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(configuration, resourceID, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, localCommunicationOnly, externalResourceInfoProvider, workingDirectory, fatalErrorHandler, delegationTokenReceiverRepository);
        return TaskExecutorToServiceAdapter.createFor(taskExecutor);
    }

    public static TaskExecutor startTaskManager(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, TaskExecutorBlobService taskExecutorBlobService, boolean localCommunicationOnly, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, FatalErrorHandler fatalErrorHandler, DelegationTokenReceiverRepository delegationTokenReceiverRepository) throws Exception {
        Preconditions.checkNotNull((Object)configuration);
        Preconditions.checkNotNull((Object)resourceID);
        Preconditions.checkNotNull((Object)rpcService);
        Preconditions.checkNotNull((Object)highAvailabilityServices);
        LOG.info("Starting TaskManager with ResourceID: {}", (Object)resourceID.getStringWithMetadata());
        SystemOutRedirectionUtils.redirectSystemOutAndError(configuration);
        String externalAddress = rpcService.getAddress();
        TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
        TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration(configuration, resourceID, externalAddress, localCommunicationOnly, taskExecutorResourceSpec, workingDirectory);
        Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(metricRegistry, externalAddress, resourceID, taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
        ExecutorService ioExecutor = Executors.newFixedThreadPool(taskManagerServicesConfiguration.getNumIoThreads(), (ThreadFactory)new ExecutorThreadFactory("flink-taskexecutor-io"));
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, taskExecutorBlobService.getPermanentBlobService(), (MetricGroup)taskManagerMetricGroup.f1, ioExecutor, rpcService.getScheduledExecutor(), fatalErrorHandler, workingDirectory);
        MetricUtils.instantiateFlinkMemoryMetricGroup((MetricGroup)taskManagerMetricGroup.f1, taskManagerServices.getTaskSlotTable(), taskManagerServices::getManagedMemorySize);
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress, workingDirectory.getTmpDirectory());
        String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
        return new TaskExecutor(rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, externalResourceInfoProvider, heartbeatServices, (TaskManagerMetricGroup)taskManagerMetricGroup.f0, metricQueryServiceAddress, taskExecutorBlobService, fatalErrorHandler, new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), delegationTokenReceiverRepository);
    }

    @VisibleForTesting
    static RpcService createRpcService(Configuration configuration, HighAvailabilityServices haServices, RpcSystem rpcSystem) throws Exception {
        Preconditions.checkNotNull((Object)configuration);
        Preconditions.checkNotNull((Object)haServices);
        return RpcUtils.createRemoteRpcService((RpcSystem)rpcSystem, (Configuration)configuration, (String)TaskManagerRunner.determineTaskManagerBindAddress(configuration, haServices, (RpcSystemUtils)rpcSystem), (String)((String)configuration.get(TaskManagerOptions.RPC_PORT)), (String)((String)configuration.get(TaskManagerOptions.BIND_HOST)), (Optional)configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));
    }

    private static String determineTaskManagerBindAddress(Configuration configuration, HighAvailabilityServices haServices, RpcSystemUtils rpcSystemUtils) throws Exception {
        String configuredTaskManagerHostname = (String)configuration.get(TaskManagerOptions.HOST);
        if (configuredTaskManagerHostname != null) {
            LOG.info("Using configured hostname/address for TaskManager: {}.", (Object)configuredTaskManagerHostname);
            return configuredTaskManagerHostname;
        }
        return TaskManagerRunner.determineTaskManagerBindAddressByConnectingToResourceManager(configuration, haServices, rpcSystemUtils);
    }

    private static String determineTaskManagerBindAddressByConnectingToResourceManager(Configuration configuration, HighAvailabilityServices haServices, RpcSystemUtils rpcSystemUtils) throws LeaderRetrievalException {
        Duration lookupTimeout = (Duration)configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION);
        InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(haServices.getResourceManagerLeaderRetriever(), lookupTimeout, rpcSystemUtils);
        LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", (Object)taskManagerAddress.getHostName(), (Object)taskManagerAddress.getHostAddress());
        HostBindPolicy bindPolicy = HostBindPolicy.fromString((String)configuration.get(TaskManagerOptions.HOST_BIND_POLICY));
        return bindPolicy == HostBindPolicy.IP ? taskManagerAddress.getHostAddress() : taskManagerAddress.getHostName();
    }

    @VisibleForTesting
    static DeterminismEnvelope<ResourceID> getTaskManagerResourceID(Configuration config, String rpcAddress, int rpcPort) {
        String metadata = (String)config.get(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, (Object)"");
        return config.getOptional(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID).map(value -> DeterminismEnvelope.deterministicValue(new ResourceID((String)value, metadata))).orElseGet(FunctionUtils.uncheckedSupplier(() -> {
            String hostName = InetAddress.getLocalHost().getHostName();
            String value = StringUtils.isNullOrWhitespaceOnly((String)rpcAddress) ? hostName + "-" + new AbstractID().toString().substring(0, 6) : rpcAddress + ":" + rpcPort + "-" + new AbstractID().toString().substring(0, 6);
            return DeterminismEnvelope.nondeterministicValue(new ResourceID(value, metadata));
        }));
    }

    public static interface TaskExecutorServiceFactory {
        public TaskExecutorService createTaskExecutor(Configuration var1, ResourceID var2, RpcService var3, HighAvailabilityServices var4, HeartbeatServices var5, MetricRegistry var6, BlobCacheService var7, boolean var8, ExternalResourceInfoProvider var9, WorkingDirectory var10, FatalErrorHandler var11, DelegationTokenReceiverRepository var12) throws Exception;
    }

    public static interface TaskExecutorService
    extends AutoCloseableAsync {
        public void start();

        public CompletableFuture<Void> getTerminationFuture();
    }

    public static enum Result {
        SUCCESS(0),
        JVM_SHUTDOWN(1),
        FAILURE(1);

        private final int exitCode;

        private Result(int exitCode) {
            this.exitCode = exitCode;
        }

        public int getExitCode() {
            return this.exitCode;
        }
    }
}

