/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.executors;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionClusterExecutor.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
    private final ClientFactory clusterClientFactory;
    private final Configuration configuration;
    private final List<JobStatusChangedListener> jobStatusChangedListeners;

    public AbstractSessionClusterExecutor(@Nonnull ClientFactory clusterClientFactory, Configuration configuration) {
        this.clusterClientFactory = (ClusterClientFactory)Preconditions.checkNotNull(clusterClientFactory);
        this.configuration = configuration;
        this.jobStatusChangedListeners = JobStatusChangedListenerUtils.createJobStatusChangedListeners((ClassLoader)Thread.currentThread().getContextClassLoader(), (Configuration)configuration, (Executor)this.executorService);
    }

    public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader userCodeClassloader) throws Exception {
        StreamGraph streamGraph = PipelineExecutorUtils.getStreamGraph(pipeline, configuration);
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState((clusterID != null ? 1 : 0) != 0);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            streamGraph.serializeUserDefinedInstances();
            CompletionStage completionStage = ((CompletableFuture)((CompletableFuture)clusterClient.submitJob((ExecutionPlan)streamGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
                ClientUtils.waitUntilJobInitializationFinished((SupplierWithException<JobStatus, Exception>)((SupplierWithException)() -> clusterClient.getJobStatus((JobID)jobId).get()), (SupplierWithException<JobResult, Exception>)((SupplierWithException)() -> clusterClient.requestJobResult((JobID)jobId).get()), userCodeClassloader);
                return jobId;
            }))).thenApplyAsync(jobID -> new ClusterClientJobClientAdapter(clusterClientProvider, (JobID)jobID, userCodeClassloader))).whenCompleteAsync((jobClient, throwable) -> {
                if (throwable == null) {
                    PipelineExecutorUtils.notifyJobStatusListeners(pipeline, (ExecutionPlan)streamGraph, this.jobStatusChangedListeners);
                } else {
                    LOG.error("Failed to submit job graph to remote session cluster.", throwable);
                }
                clusterClient.close();
            });
            return completionStage;
        }
    }

    public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds(Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState((clusterID != null ? 1 : 0) != 0);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletableFuture<Set<AbstractID>> completableFuture = clusterClient.listCompletedClusterDatasetIds();
            return completableFuture;
        }
    }

    public CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
        try (ClusterDescriptor clusterDescriptor = this.clusterClientFactory.createClusterDescriptor(configuration);){
            Object clusterID = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState((clusterID != null ? 1 : 0) != 0);
            ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(clusterID);
            ClusterClient clusterClient = clusterClientProvider.getClusterClient();
            CompletionStage completionStage = clusterClient.invalidateClusterDataset((AbstractID)new IntermediateDataSetID(clusterDatasetId)).thenApply(acknowledge -> null);
            return completionStage;
        }
    }
}

