/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.AbstractSlotSharingStrategyTest;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class LocalInputPreferredSlotSharingStrategyTest
extends AbstractSlotSharingStrategyTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private final JobVertexID jobVertexId3 = new JobVertexID();
    private TestingSchedulingExecutionVertex ev11;
    private TestingSchedulingExecutionVertex ev12;
    private TestingSchedulingExecutionVertex ev21;
    private TestingSchedulingExecutionVertex ev22;
    private TestingSchedulingExecutionVertex ev23;

    LocalInputPreferredSlotSharingStrategyTest() {
    }

    @Override
    protected SlotSharingStrategy getSlotSharingStrategy(SchedulingTopology topology, Set<SlotSharingGroup> slotSharingGroups, Set<CoLocationGroup> coLocationGroups) {
        return new LocalInputPreferredSlotSharingStrategy(topology, slotSharingGroups, coLocationGroups);
    }

    @Test
    void testInputLocalityIsRespectedWithRescaleEdge() {
        this.createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        this.ev23 = this.topology.newExecutionVertex(this.jobVertexId2, 2);
        this.topology.connect(this.ev11, this.ev21);
        this.topology.connect(this.ev11, this.ev22);
        this.topology.connect(this.ev12, this.ev23);
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(this.topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(3);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev22.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev23.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev23.getId()});
    }

    private void createTwoExeVerticesPerJv1AndJv2(SlotSharingGroup sharingGroup) {
        this.ev11 = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        this.ev12 = this.topology.newExecutionVertex(this.jobVertexId1, 1);
        this.ev21 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.ev22 = this.topology.newExecutionVertex(this.jobVertexId2, 1);
        sharingGroup.addVertexToGroup(this.jobVertexId1);
        sharingGroup.addVertexToGroup(this.jobVertexId2);
    }

    @Test
    void testInputLocalityIsRespectedWithAllToAllEdge() {
        this.slotSharingGroup.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup.addVertexToGroup(this.jobVertexId2);
        List<TestingSchedulingExecutionVertex> producer = this.topology.addExecutionVertices().withParallelism(2).withJobVertexID(this.jobVertexId1).finish();
        List<TestingSchedulingExecutionVertex> consumer = this.topology.addExecutionVertices().withParallelism(2).withJobVertexID(this.jobVertexId2).finish();
        this.topology.connectAllToAll(producer, consumer).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        this.ev11 = producer.get(0);
        this.ev12 = producer.get(1);
        this.ev21 = consumer.get(0);
        this.ev22 = consumer.get(1);
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(this.topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev22.getId()});
    }

    @Test
    void testCoLocationConstraintIsRespected() {
        ArrayList<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> jobVertexInfos = new ArrayList<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>>();
        CoLocationGroupImpl coLocationGroup1 = new CoLocationGroupImpl(new JobVertex[0]);
        CoLocationGroupImpl coLocationGroup2 = new CoLocationGroupImpl(new JobVertex[0]);
        ArrayList mockedJobVertices = Lists.newArrayList((Object[])new AbstractSlotSharingStrategyTest.TestingJobVertexInfo[]{new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(1, this.slotSharingGroup, null), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(2, this.slotSharingGroup, (CoLocationGroup)coLocationGroup1), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(2, this.slotSharingGroup, (CoLocationGroup)coLocationGroup1), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(3, this.slotSharingGroup, (CoLocationGroup)coLocationGroup2), new AbstractSlotSharingStrategyTest.TestingJobVertexInfo(3, this.slotSharingGroup, (CoLocationGroup)coLocationGroup2)});
        this.renderTopology(this.topology, mockedJobVertices, jobVertexInfos);
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(this.topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup}), Sets.newHashSet((Object[])new CoLocationGroup[]{coLocationGroup1, coLocationGroup2}));
        List executionVertices1 = (List)((Tuple2)jobVertexInfos.get((int)1)).f1;
        List executionVertices2 = (List)((Tuple2)jobVertexInfos.get((int)2)).f1;
        Assertions.assertThat((List)executionVertices1).hasSameSizeAs((Iterable)executionVertices2);
        for (int i = 0; i < executionVertices1.size(); ++i) {
            ExecutionSlotSharingGroup executionSlotSharingGroup = strategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex)executionVertices1.get(i)).getId());
            Assertions.assertThat((Object)executionSlotSharingGroup).isEqualTo((Object)strategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex)executionVertices2.get(i)).getId()));
        }
        List executionVertices3 = (List)((Tuple2)jobVertexInfos.get((int)3)).f1;
        List executionVertices4 = (List)((Tuple2)jobVertexInfos.get((int)4)).f1;
        Assertions.assertThat((List)executionVertices3).hasSameSizeAs((Iterable)executionVertices4);
        for (int i = 0; i < executionVertices3.size(); ++i) {
            Assertions.assertThat((Object)strategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex)executionVertices3.get(i)).getId())).isEqualTo((Object)strategy.getExecutionSlotSharingGroup(((TestingSchedulingExecutionVertex)executionVertices4.get(i)).getId()));
        }
    }

    @Test
    void testDisjointVerticesInOneGroup() {
        this.createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(this.topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev22.getId()});
    }

    @Test
    void testVerticesInDifferentSlotSharingGroups() {
        this.ev11 = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        this.ev12 = this.topology.newExecutionVertex(this.jobVertexId1, 1);
        this.ev21 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.ev22 = this.topology.newExecutionVertex(this.jobVertexId2, 1);
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup2.addVertexToGroup(this.jobVertexId2);
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(this.topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup1, this.slotSharingGroup2}), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(4);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev11.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev12.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev21.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{this.ev22.getId()});
    }

    @Test
    void testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws Exception {
        this.createTwoExeVerticesPerJv1AndJv2(this.slotSharingGroup);
        int parallelism = 4;
        JobVertex v1 = LocalInputPreferredSlotSharingStrategyTest.createJobVertex("v1", this.jobVertexId1, parallelism);
        JobVertex v2 = LocalInputPreferredSlotSharingStrategyTest.createJobVertex("v2", this.jobVertexId2, parallelism);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Assertions.assertThat((List)v1.getProducedDataSets()).hasSize(2);
        Assertions.assertThat((List)v2.getInputs()).hasSize(2);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(v1, v2);
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        SchedulingTopology topology = executionGraph.getSchedulingTopology();
        SlotSharingStrategy strategy = this.getSlotSharingStrategy(topology, Sets.newHashSet((Object[])new SlotSharingGroup[]{this.slotSharingGroup}), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(4);
        ExecutionVertex[] ev1 = Objects.requireNonNull(executionGraph.getJobVertex(this.jobVertexId1)).getTaskVertices();
        ExecutionVertex[] ev2 = Objects.requireNonNull(executionGraph.getJobVertex(this.jobVertexId2)).getTaskVertices();
        for (int i = 0; i < parallelism; ++i) {
            Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev1[i].getID()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev1[i].getID(), ev2[i].getID()});
        }
    }

    @Test
    void testGetExecutionSlotSharingGroupOfLateAttachedVertices() {
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId1);
        this.slotSharingGroup1.addVertexToGroup(this.jobVertexId2);
        this.slotSharingGroup2.addVertexToGroup(this.jobVertexId3);
        TestingSchedulingExecutionVertex ev1 = this.topology.newExecutionVertex(this.jobVertexId1, 0);
        TestingSchedulingExecutionVertex ev2 = this.topology.newExecutionVertex(this.jobVertexId2, 0);
        this.topology.connect(ev1, ev2);
        LocalInputPreferredSlotSharingStrategy strategy = (LocalInputPreferredSlotSharingStrategy)this.getSlotSharingStrategy(this.topology, new HashSet<SlotSharingGroup>(Arrays.asList(this.slotSharingGroup1, this.slotSharingGroup2)), Collections.emptySet());
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(1);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev1.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev1.getId(), ev2.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev2.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev1.getId(), ev2.getId()});
        TestingSchedulingExecutionVertex ev3 = this.topology.newExecutionVertex(this.jobVertexId3, 0);
        this.topology.connect(ev2, ev3, ResultPartitionType.BLOCKING);
        strategy.notifySchedulingTopologyUpdated((SchedulingTopology)this.topology, Collections.singletonList(ev3.getId()));
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroups()).hasSize(2);
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev1.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev1.getId(), ev2.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev2.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev1.getId(), ev2.getId()});
        Assertions.assertThat((Collection)strategy.getExecutionSlotSharingGroup(ev3.getId()).getExecutionVertexIds()).contains((Object[])new ExecutionVertexID[]{ev3.getId()});
    }

    private static JobVertex createJobVertex(String vertexName, JobVertexID vertexId, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName, vertexId);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}

