/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTrackerBuilder;
import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.guava32.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class VertexThreadInfoTrackerTest {
    private static final int REQUEST_ID = 0;
    private static final int PARALLELISM = 10;
    private static final JobID JOB_ID = new JobID();
    private static final Duration CLEAN_UP_INTERVAL = Duration.ofSeconds(60L);
    private static final Duration STATS_REFRESH_INTERVAL = Duration.ofSeconds(60L);
    private static final Duration TIME_GAP = Duration.ofSeconds(60L);
    private static final Duration SMALL_TIME_GAP = Duration.ofMillis(1L);
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10L);
    private static final int NUMBER_OF_SAMPLES = 1;
    private static final int MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    private static ScheduledExecutorService executor;
    private ExecutionJobVertex executionJobVertex;
    private ExecutionAttemptID[] attemptIDS;
    private VertexThreadInfoStats threadInfoStatsDefaultSample;
    @Parameter
    public ExecutionState executionState;

    VertexThreadInfoTrackerTest() {
    }

    @Parameters(name="executionState={0}")
    private static Collection<ExecutionState> parameters() {
        return Arrays.asList(ExecutionState.RUNNING, ExecutionState.INITIALIZING);
    }

    @BeforeEach
    void setUp() {
        this.executionJobVertex = this.createExecutionJobVertex();
        this.attemptIDS = (ExecutionAttemptID[])Arrays.stream(this.executionJobVertex.getTaskVertices()).map(executionVertex -> executionVertex.getCurrentExecutionAttempt().getAttemptId()).sorted(Comparator.comparingInt(ExecutionAttemptID::getSubtaskIndex)).toArray(ExecutionAttemptID[]::new);
        this.threadInfoStatsDefaultSample = this.createThreadInfoStats(0, SMALL_TIME_GAP);
        executor = Executors.newScheduledThreadPool(1);
    }

    @AfterEach
    void tearDown() {
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @TestTemplate
    void testGetThreadInfoStats() throws Exception {
        this.doInitialJobVertexRequestAndVerifyResult(this.createThreadInfoTracker());
        for (int subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            this.doInitialExecutionVertexRequestAndVerifyResult(this.createThreadInfoTracker(), subtaskIndex);
        }
    }

    @TestTemplate
    void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        int subtaskIndex;
        VertexThreadInfoStats unusedThreadInfoStats = this.createThreadInfoStats(1, TIME_GAP);
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, this.threadInfoStatsDefaultSample, unusedThreadInfoStats);
        this.doInitialJobVertexRequestAndVerifyResult(tracker);
        Optional result = tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex);
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(this.threadInfoStatsDefaultSample, result);
        for (subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(this.threadInfoStatsDefaultSample, this.attemptIDS[subtaskIndex]), tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
        }
        for (subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            VertexThreadInfoTracker tracker1 = this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, this.threadInfoStatsDefaultSample, unusedThreadInfoStats);
            this.doInitialExecutionVertexRequestAndVerifyResult(tracker1, subtaskIndex);
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(this.threadInfoStatsDefaultSample, this.attemptIDS[0]), tracker1.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
        }
    }

    @TestTemplate
    void testJobVertexCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration shortRefreshInterval = Duration.ofMillis(1000L);
        VertexThreadInfoStats initialThreadInfoStats = this.createThreadInfoStats(Instant.now().minus(10L, ChronoUnit.SECONDS), 0, Duration.ofMillis(5L));
        VertexThreadInfoStats threadInfoStatsAfterRefresh = this.createThreadInfoStats(1, TIME_GAP);
        CountDownLatch cacheRefreshed = new CountDownLatch(1);
        Cache jobVertexStatsCache = this.createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener(cacheRefreshed));
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, shortRefreshInterval, jobVertexStatsCache, null, initialThreadInfoStats, threadInfoStatsAfterRefresh);
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(initialThreadInfoStats, tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex));
        cacheRefreshed.await();
        Optional result = tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex);
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
    }

    @TestTemplate
    void testExecutionVertexCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration shortRefreshInterval = Duration.ofMillis(1000L);
        VertexThreadInfoStats initialThreadInfoStats = this.createThreadInfoStats(Instant.now().minus(10L, ChronoUnit.SECONDS), 0, Duration.ofMillis(5L));
        VertexThreadInfoStats threadInfoStatsAfterRefresh = this.createThreadInfoStats(1, TIME_GAP);
        for (int subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            CountDownLatch cacheRefreshed = new CountDownLatch(1);
            Cache executionVertexStatsCache = this.createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener(cacheRefreshed));
            VertexThreadInfoTracker tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, shortRefreshInterval, null, executionVertexStatsCache, initialThreadInfoStats, threadInfoStatsAfterRefresh);
            AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isNotPresent();
            tracker.getResultAvailableFuture().get();
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(initialThreadInfoStats, this.attemptIDS[subtaskIndex]), tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
            cacheRefreshed.await();
            Optional result = tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex);
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(threadInfoStatsAfterRefresh, this.attemptIDS[subtaskIndex]), result);
        }
    }

    @TestTemplate
    void testExecutionVertexShouldBeIgnoredWhenJobVertexIsPending() throws Exception {
        int subtaskIndex;
        CompletableFuture<VertexThreadInfoStats> statsFuture = new CompletableFuture<VertexThreadInfoStats>();
        TestingBlockingAndCountableCoordinator coordinator = new TestingBlockingAndCountableCoordinator(statsFuture);
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL, null, null, coordinator);
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        AssertionsForClassTypes.assertThat((long)coordinator.getTriggerCounter()).isOne();
        for (subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isNotPresent();
            AssertionsForClassTypes.assertThat((CompletableFuture)tracker.getResultAvailableFuture()).isNotCompleted();
            AssertionsForClassTypes.assertThat((long)coordinator.getTriggerCounter()).isOne();
        }
        statsFuture.complete(this.threadInfoStatsDefaultSample);
        tracker.getResultAvailableFuture().get();
        for (subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isPresent();
            AssertionsForClassTypes.assertThat((long)coordinator.getTriggerCounter()).isOne();
        }
    }

    @TestTemplate
    void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        Duration shortCleanUpInterval = Duration.ofMillis(1L);
        CountDownLatch cacheExpired = new CountDownLatch(1);
        Cache jobVertexStatsCache = this.createCache(shortCleanUpInterval, new LatchRemovalListener(cacheExpired));
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker(shortCleanUpInterval, STATS_REFRESH_INTERVAL, jobVertexStatsCache, null, this.threadInfoStatsDefaultSample);
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        cacheExpired.await();
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        for (int subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            CountDownLatch executionCacheExpired = new CountDownLatch(1);
            Cache executionVertexStatsCache = this.createCache(shortCleanUpInterval, new LatchRemovalListener(executionCacheExpired));
            VertexThreadInfoTracker executionTracker = this.createThreadInfoTracker(shortCleanUpInterval, STATS_REFRESH_INTERVAL, null, executionVertexStatsCache, this.threadInfoStatsDefaultSample);
            AssertionsForClassTypes.assertThat((Optional)executionTracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isNotPresent();
            executionCacheExpired.await();
            AssertionsForClassTypes.assertThat((Optional)executionTracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isNotPresent();
        }
    }

    @TestTemplate
    void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker();
        this.doInitialJobVertexRequestAndVerifyResult(tracker);
        tracker.cleanUpStatsCache();
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(this.threadInfoStatsDefaultSample, tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex));
        for (int subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(this.threadInfoStatsDefaultSample, this.attemptIDS[subtaskIndex]), tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
        }
    }

    @TestTemplate
    void testShutDown() throws Exception {
        VertexThreadInfoTracker tracker = this.createThreadInfoTracker();
        this.doInitialJobVertexRequestAndVerifyResult(tracker);
        tracker.shutDown();
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, 0)).isNotPresent();
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, 0)).isNotPresent();
    }

    private <K> Cache<K, VertexThreadInfoStats> createCache(Duration cleanUpInterval, RemovalListener<K, VertexThreadInfoStats> removalListener) {
        return CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS).removalListener(removalListener).build();
    }

    private void doInitialJobVertexRequestAndVerifyResult(VertexThreadInfoTracker tracker) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat((Optional)tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(this.threadInfoStatsDefaultSample, tracker.getJobVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex));
        for (int subtaskIndex = 0; subtaskIndex < 10; ++subtaskIndex) {
            VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(this.threadInfoStatsDefaultSample, this.attemptIDS[subtaskIndex]), tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
        }
    }

    private void doInitialExecutionVertexRequestAndVerifyResult(VertexThreadInfoTracker tracker, int subtaskIndex) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat((Optional)tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        VertexThreadInfoTrackerTest.assertExpectedEqualsReceived(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertex(this.threadInfoStatsDefaultSample, this.attemptIDS[subtaskIndex]), tracker.getExecutionVertexStats(JOB_ID, (AccessExecutionJobVertex)this.executionJobVertex, subtaskIndex));
    }

    private static VertexThreadInfoStats generateThreadInfoStatsForExecutionVertex(VertexThreadInfoStats jobVertexStats, ExecutionAttemptID executionAttemptID) {
        return VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertices(jobVertexStats, Collections.singleton(executionAttemptID));
    }

    private static VertexThreadInfoStats generateThreadInfoStatsForExecutionVertices(VertexThreadInfoStats jobVertexStats, Set<ExecutionAttemptID> executionAttemptIDS) {
        if (executionAttemptIDS.equals(jobVertexStats.getSamplesBySubtask().keySet())) {
            return jobVertexStats;
        }
        return new VertexThreadInfoStats(jobVertexStats.getRequestId(), jobVertexStats.getStartTime(), jobVertexStats.getEndTime(), jobVertexStats.getSamplesBySubtask().entrySet().stream().filter(entry -> executionAttemptIDS.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    private static void assertExpectedEqualsReceived(VertexThreadInfoStats expected, Optional<VertexThreadInfoStats> receivedOptional) {
        AssertionsForClassTypes.assertThat(receivedOptional).isPresent();
        VertexThreadInfoStats received = receivedOptional.get();
        AssertionsForClassTypes.assertThat((int)expected.getRequestId()).isEqualTo(received.getRequestId());
        AssertionsForClassTypes.assertThat((long)expected.getEndTime()).isEqualTo(received.getEndTime());
        AssertionsForClassTypes.assertThat((int)expected.getNumberOfSubtasks()).isEqualTo(received.getNumberOfSubtasks());
        for (Collection samples : received.getSamplesBySubtask().values()) {
            AssertionsForClassTypes.assertThat((boolean)samples.isEmpty()).isFalse();
        }
    }

    private VertexThreadInfoTracker createThreadInfoTracker() {
        return this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, this.threadInfoStatsDefaultSample);
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration statsRefreshInterval, VertexThreadInfoStats ... stats) {
        return this.createThreadInfoTracker(CLEAN_UP_INTERVAL, statsRefreshInterval, null, null, stats);
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration cleanUpInterval, Duration statsRefreshInterval, @Nullable Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats> jobVertexStatsCache, @Nullable Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats> executionVertexStatsCache, VertexThreadInfoStats ... stats) {
        TestingThreadInfoRequestCoordinator coordinator = new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
        return this.createThreadInfoTracker(cleanUpInterval, statsRefreshInterval, jobVertexStatsCache, executionVertexStatsCache, coordinator);
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration cleanUpInterval, Duration statsRefreshInterval, @Nullable Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats> jobVertexStatsCache, @Nullable Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats> executionVertexStatsCache, ThreadInfoRequestCoordinator coordinator) {
        return VertexThreadInfoTrackerBuilder.newBuilder(VertexThreadInfoTrackerTest::createMockResourceManagerGateway, (ScheduledExecutorService)executor, (Duration)TestingUtils.TIMEOUT).setCoordinator(coordinator).setCleanUpInterval(cleanUpInterval).setNumSamples(1).setStatsRefreshInterval(statsRefreshInterval).setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES).setMaxThreadInfoDepth(100).setJobVertexStatsCache(jobVertexStatsCache).setExecutionVertexStatsCache(executionVertexStatsCache).build();
    }

    private VertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
        return this.createThreadInfoStats(Instant.now(), requestId, timeGap);
    }

    private VertexThreadInfoStats createThreadInfoStats(Instant startTime, int requestId, Duration timeGap) {
        Instant endTime = startTime.plus(timeGap);
        HashMap<ExecutionAttemptID, List<ThreadInfoSample>> samples = new HashMap<ExecutionAttemptID, List<ThreadInfoSample>>();
        for (ExecutionVertex vertex : this.executionJobVertex.getTaskVertices()) {
            Optional threadInfoSample = JvmUtils.createThreadInfoSample((long)Thread.currentThread().getId(), (int)100);
            Preconditions.checkState((boolean)threadInfoSample.isPresent(), (Object)"The threadInfoSample should be empty.");
            samples.put(vertex.getCurrentExecutionAttempt().getAttemptId(), Collections.singletonList((ThreadInfoSample)threadInfoSample.get()));
        }
        return new VertexThreadInfoStats(requestId, startTime.toEpochMilli(), endTime.toEpochMilli(), samples);
    }

    private ExecutionJobVertex createExecutionJobVertex() {
        try {
            JobVertex jobVertex = new JobVertex("testVertex");
            jobVertex.setParallelism(10);
            jobVertex.setInvokableClass(AbstractInvokable.class);
            DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new DirectScheduledExecutorService());
            ExecutionGraph eg = scheduler.getExecutionGraph();
            scheduler.startScheduling();
            switch (this.executionState) {
                case RUNNING: {
                    ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
                    break;
                }
                case INITIALIZING: {
                    ExecutionGraphTestUtils.switchAllVerticesToInitializing(eg);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Just support RUNNING and INITIALIZING.");
                }
            }
            return scheduler.getExecutionJobVertex(jobVertex.getID());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.", e);
        }
    }

    private static CompletableFuture<ResourceManagerGateway> createMockResourceManagerGateway() {
        Function<ResourceID, CompletableFuture<TaskExecutorThreadInfoGateway>> function = resourceID -> CompletableFuture.completedFuture(null);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestTaskExecutorGatewayFunction(function);
        return CompletableFuture.completedFuture(testingResourceManagerGateway);
    }

    private static class LatchRemovalListener<K, V>
    implements RemovalListener<K, V> {
        private final CountDownLatch latch;

        private LatchRemovalListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public void onRemoval(@Nonnull RemovalNotification<K, V> removalNotification) {
            this.latch.countDown();
        }
    }

    private static class TestingBlockingAndCountableCoordinator
    extends ThreadInfoRequestCoordinator {
        private final CompletableFuture<VertexThreadInfoStats> blockingFuture;
        private final AtomicLong triggerCounter;

        TestingBlockingAndCountableCoordinator(CompletableFuture<VertexThreadInfoStats> blockingFuture) {
            super(Runnable::run, REQUEST_TIMEOUT);
            this.blockingFuture = blockingFuture;
            this.triggerCounter = new AtomicLong(0L);
        }

        public long getTriggerCounter() {
            return this.triggerCounter.get();
        }

        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionsWithGateways, int ignored2, Duration ignored3, int ignored4) {
            this.triggerCounter.getAndIncrement();
            return this.blockingFuture.thenApply(stats -> VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertices(stats, (Set)executionsWithGateways.keySet().iterator().next()));
        }
    }

    private static class TestingThreadInfoRequestCoordinator
    extends ThreadInfoRequestCoordinator {
        private final VertexThreadInfoStats[] jobVertexThreadInfoStats;
        private int counter = 0;

        TestingThreadInfoRequestCoordinator(Executor executor, Duration requestTimeout, VertexThreadInfoStats ... jobVertexThreadInfoStats) {
            super(executor, requestTimeout);
            this.jobVertexThreadInfoStats = jobVertexThreadInfoStats;
        }

        private VertexThreadInfoStats getVertexThreadInfoStats() {
            return this.jobVertexThreadInfoStats[this.counter++ % this.jobVertexThreadInfoStats.length];
        }

        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionsWithGateways, int ignored2, Duration ignored3, int ignored4) {
            AssertionsForClassTypes.assertThat((executionsWithGateways.size() == 1 ? 1 : 0) != 0).isTrue();
            ImmutableSet<ExecutionAttemptID> executionAttemptIDS = executionsWithGateways.keySet().iterator().next();
            return CompletableFuture.completedFuture(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertices(this.getVertexThreadInfoStats(), executionAttemptIDS));
        }
    }
}

