/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerService;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ResourceManagerPartitionLifecycleTest {
    private static TestingRpcService rpcService;
    private TestingResourceManagerService resourceManagerService;

    ResourceManagerPartitionLifecycleTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    void setup() {
    }

    @AfterEach
    void after() throws Exception {
        if (this.resourceManagerService != null) {
            this.resourceManagerService.rethrowFatalErrorIfAny();
            this.resourceManagerService.cleanUp();
        }
    }

    @AfterAll
    static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    void testClusterPartitionReportHandling() throws Exception {
        CompletableFuture clusterPartitionReleaseFuture = new CompletableFuture();
        this.runTest(builder -> builder.setReleaseClusterPartitionsConsumer(clusterPartitionReleaseFuture::complete), (resourceManagerGateway, taskManagerId1, ignored) -> {
            IntermediateDataSetID dataSetID = new IntermediateDataSetID();
            ResultPartitionID resultPartitionID = new ResultPartitionID();
            resourceManagerGateway.heartbeatFromTaskManager(taskManagerId1, ResourceManagerPartitionLifecycleTest.createTaskExecutorHeartbeatPayload(dataSetID, 2, resultPartitionID, new ResultPartitionID()));
            resourceManagerGateway.heartbeatFromTaskManager(taskManagerId1, ResourceManagerPartitionLifecycleTest.createTaskExecutorHeartbeatPayload(dataSetID, 2, resultPartitionID));
            Collection intermediateDataSetIDS = (Collection)clusterPartitionReleaseFuture.get();
            Assertions.assertThat((Collection)intermediateDataSetIDS).contains((Object[])new IntermediateDataSetID[]{dataSetID});
        });
    }

    @Test
    void testTaskExecutorShutdownHandling() throws Exception {
        CompletableFuture clusterPartitionReleaseFuture = new CompletableFuture();
        this.runTest(builder -> builder.setReleaseClusterPartitionsConsumer(clusterPartitionReleaseFuture::complete), (resourceManagerGateway, taskManagerId1, taskManagerId2) -> {
            IntermediateDataSetID dataSetID = new IntermediateDataSetID();
            resourceManagerGateway.heartbeatFromTaskManager(taskManagerId1, ResourceManagerPartitionLifecycleTest.createTaskExecutorHeartbeatPayload(dataSetID, 2, new ResultPartitionID()));
            resourceManagerGateway.heartbeatFromTaskManager(taskManagerId2, ResourceManagerPartitionLifecycleTest.createTaskExecutorHeartbeatPayload(dataSetID, 2, new ResultPartitionID()));
            resourceManagerGateway.disconnectTaskManager(taskManagerId2, (Exception)new RuntimeException("test exception"));
            Collection intermediateDataSetIDS = (Collection)clusterPartitionReleaseFuture.get();
            Assertions.assertThat((Collection)intermediateDataSetIDS).contains((Object[])new IntermediateDataSetID[]{dataSetID});
        });
    }

    private void runTest(TaskExecutorSetup taskExecutorBuilderSetup, TestAction testAction) throws Exception {
        ResourceManagerGateway resourceManagerGateway = this.createAndStartResourceManager();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGateway1Builder = new TestingTaskExecutorGatewayBuilder();
        taskExecutorBuilderSetup.accept(testingTaskExecutorGateway1Builder);
        TestingTaskExecutorGateway taskExecutorGateway1 = testingTaskExecutorGateway1Builder.setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway1.getAddress(), (RpcGateway)taskExecutorGateway1);
        TestingTaskExecutorGateway taskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway2.getAddress(), (RpcGateway)taskExecutorGateway2);
        ResourceID taskManagerId1 = ResourceID.generate();
        ResourceID taskManagerId2 = ResourceID.generate();
        ResourceManagerPartitionLifecycleTest.registerTaskExecutor(resourceManagerGateway, taskManagerId1, taskExecutorGateway1.getAddress());
        ResourceManagerPartitionLifecycleTest.registerTaskExecutor(resourceManagerGateway, taskManagerId2, taskExecutorGateway2.getAddress());
        testAction.accept(resourceManagerGateway, taskManagerId1, taskManagerId2);
    }

    static void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) {
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorAddress, taskExecutorId, 1234, 23456, new HardwareDescription(42, 1337L, 1337L, 0L), new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.ZERO, ResourceProfile.ZERO, taskExecutorAddress);
        CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT);
        FlinkAssertions.assertThatFuture((CompletableFuture)registrationFuture).eventuallySucceeds().isInstanceOf(RegistrationResponse.Success.class);
    }

    private ResourceManagerGateway createAndStartResourceManager() throws Exception {
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.resourceManagerService = TestingResourceManagerService.newBuilder().setRpcService(rpcService).setRmLeaderElection(leaderElection).build();
        this.resourceManagerService.start();
        this.resourceManagerService.isLeader(UUID.randomUUID()).join();
        return this.resourceManagerService.getResourceManagerGateway().orElseThrow(() -> new AssertionError((Object)"RM not available after confirming leadership."));
    }

    private static TaskExecutorHeartbeatPayload createTaskExecutorHeartbeatPayload(IntermediateDataSetID dataSetId, int numTotalPartitions, ResultPartitionID ... partitionIds) {
        Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = Arrays.stream(partitionIds).map(x$0 -> new TestingShuffleDescriptor((ResultPartitionID)x$0)).collect(Collectors.toMap(TestingShuffleDescriptor::getResultPartitionID, d -> d));
        return new TaskExecutorHeartbeatPayload(new SlotReport(), new ClusterPartitionReport(Collections.singletonList(new ClusterPartitionReport.ClusterPartitionReportEntry(dataSetId, numTotalPartitions, shuffleDescriptors))));
    }

    @FunctionalInterface
    private static interface TaskExecutorSetup {
        public void accept(TestingTaskExecutorGatewayBuilder var1) throws Exception;
    }

    @FunctionalInterface
    private static interface TestAction {
        public void accept(ResourceManagerGateway var1, ResourceID var2, ResourceID var3) throws Exception;
    }

    private static class TestingShuffleDescriptor
    implements ShuffleDescriptor {
        private final ResultPartitionID resultPartitionID;

        private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
            this.resultPartitionID = resultPartitionID;
        }

        public ResultPartitionID getResultPartitionID() {
            return this.resultPartitionID;
        }

        public Optional<ResourceID> storesLocalResourcesOn() {
            return Optional.empty();
        }
    }
}

