package org.apache.tez.dag.app.dag;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/TestStateChangeNotifier.class */
public class TestStateChangeNotifier {

    /* loaded from: input_file:org/apache/tez/dag/app/dag/TestStateChangeNotifier$StateChangeNotifierForTest.class */
    public static class StateChangeNotifierForTest extends StateChangeNotifier {
        private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifierForTest.class);
        AtomicInteger count;
        AtomicInteger totalCount;

        public StateChangeNotifierForTest(DAG dag) {
            super(dag);
            this.count = new AtomicInteger(0);
            this.totalCount = new AtomicInteger(0);
        }

        public void reset() {
            this.count.set(0);
            this.totalCount.set(0);
        }

        protected void processedEventFromQueue() {
            while (this.count.get() <= 0) {
                try {
                    Thread.sleep(10L);
                    LOG.info("sleep to wait for available events");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            synchronized (this.count) {
                if (this.count.decrementAndGet() == 0) {
                    this.count.notifyAll();
                }
            }
        }

        protected void addedEventToQueue() {
            this.totalCount.incrementAndGet();
            synchronized (this.count) {
                if (this.count.incrementAndGet() > 0) {
                    try {
                        this.count.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    @Test(timeout = 5000)
    public void testEventsOnRegistration() {
        TezDAGID tezDAGID = TezDAGID.getInstance("1", 1, 1);
        Vertex createMockVertex = createMockVertex(tezDAGID, 1);
        Vertex createMockVertex2 = createMockVertex(tezDAGID, 2);
        Vertex createMockVertex3 = createMockVertex(tezDAGID, 3);
        StateChangeNotifierForTest stateChangeNotifierForTest = new StateChangeNotifierForTest(createMockDag(tezDAGID, createMockVertex, createMockVertex2, createMockVertex3));
        notifyTracker(stateChangeNotifierForTest, createMockVertex, VertexState.RUNNING);
        VertexStateUpdateListener vertexStateUpdateListener = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener vertexStateUpdateListener2 = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener vertexStateUpdateListener3 = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        VertexStateUpdateListener vertexStateUpdateListener4 = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), null, vertexStateUpdateListener);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), EnumSet.allOf(VertexState.class), vertexStateUpdateListener2);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), EnumSet.of(VertexState.RUNNING), vertexStateUpdateListener3);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), EnumSet.of(VertexState.SUCCEEDED), vertexStateUpdateListener4);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener, Mockito.times(1))).onStateUpdated((VertexStateUpdate) forClass.capture());
        Assert.assertEquals(VertexState.RUNNING, ((VertexStateUpdate) forClass.getValue()).getVertexState());
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener2, Mockito.times(1))).onStateUpdated((VertexStateUpdate) forClass.capture());
        Assert.assertEquals(VertexState.RUNNING, ((VertexStateUpdate) forClass.getValue()).getVertexState());
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener3, Mockito.times(1))).onStateUpdated((VertexStateUpdate) forClass.capture());
        Assert.assertEquals(VertexState.RUNNING, ((VertexStateUpdate) forClass.getValue()).getVertexState());
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener4, Mockito.never())).onStateUpdated((VertexStateUpdate) Mockito.any());
        stateChangeNotifierForTest.reset();
        VertexStateUpdateListener vertexStateUpdateListener5 = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex2.getName(), null, vertexStateUpdateListener5);
        Assert.assertEquals(0L, stateChangeNotifierForTest.totalCount.get());
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener5, Mockito.never())).onStateUpdated((VertexStateUpdate) Mockito.any());
        stateChangeNotifierForTest.stateChanged(createMockVertex3.getVertexId(), new VertexStateUpdateParallelismUpdated(createMockVertex3.getName(), 23, -1));
        VertexStateUpdateListener vertexStateUpdateListener6 = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex3.getName(), null, vertexStateUpdateListener6);
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener6, Mockito.times(1))).onStateUpdated((VertexStateUpdate) forClass.capture());
        Assert.assertEquals(VertexState.PARALLELISM_UPDATED, ((VertexStateUpdate) forClass.getValue()).getVertexState());
    }

    @Test(timeout = 5000)
    public void testSimpleStateUpdates() {
        TezDAGID tezDAGID = TezDAGID.getInstance("1", 1, 1);
        Vertex createMockVertex = createMockVertex(tezDAGID, 1);
        StateChangeNotifierForTest stateChangeNotifierForTest = new StateChangeNotifierForTest(createMockDag(tezDAGID, createMockVertex));
        VertexStateUpdateListener vertexStateUpdateListener = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), null, vertexStateUpdateListener);
        ArrayList newArrayList = Lists.newArrayList(new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            notifyTracker(stateChangeNotifierForTest, createMockVertex, (VertexState) it.next());
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener, Mockito.times(newArrayList.size()))).onStateUpdated((VertexStateUpdate) forClass.capture());
        List allValues = forClass.getAllValues();
        Iterator it2 = newArrayList.iterator();
        for (int i = 0; i < newArrayList.size(); i++) {
            Assert.assertEquals(it2.next(), ((VertexStateUpdate) allValues.get(i)).getVertexState());
        }
    }

    @Test(timeout = 5000)
    public void testDuplicateRegistration() {
        TezDAGID tezDAGID = TezDAGID.getInstance("1", 1, 1);
        Vertex createMockVertex = createMockVertex(tezDAGID, 1);
        StateChangeNotifierForTest stateChangeNotifierForTest = new StateChangeNotifierForTest(createMockDag(tezDAGID, createMockVertex));
        VertexStateUpdateListener vertexStateUpdateListener = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), null, vertexStateUpdateListener);
        try {
            stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), null, vertexStateUpdateListener);
            Assert.fail("Expecting an error from duplicate registrations of the same listener");
        } catch (TezUncheckedException e) {
        }
    }

    @Test(timeout = 5000)
    public void testSpecificStateUpdates() {
        TezDAGID tezDAGID = TezDAGID.getInstance("1", 1, 1);
        Vertex createMockVertex = createMockVertex(tezDAGID, 1);
        StateChangeNotifierForTest stateChangeNotifierForTest = new StateChangeNotifierForTest(createMockDag(tezDAGID, createMockVertex));
        VertexStateUpdateListener vertexStateUpdateListener = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED), vertexStateUpdateListener);
        ArrayList newArrayList = Lists.newArrayList(new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        ArrayList newArrayList2 = Lists.newArrayList(new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.RUNNING, VertexState.SUCCEEDED});
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            notifyTracker(stateChangeNotifierForTest, createMockVertex, (VertexState) it.next());
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener, Mockito.times(newArrayList2.size()))).onStateUpdated((VertexStateUpdate) forClass.capture());
        List allValues = forClass.getAllValues();
        Iterator it2 = newArrayList2.iterator();
        for (int i = 0; i < newArrayList2.size(); i++) {
            Assert.assertEquals(it2.next(), ((VertexStateUpdate) allValues.get(i)).getVertexState());
        }
    }

    @Test(timeout = 5000)
    public void testUnregister() {
        TezDAGID tezDAGID = TezDAGID.getInstance("1", 1, 1);
        Vertex createMockVertex = createMockVertex(tezDAGID, 1);
        StateChangeNotifierForTest stateChangeNotifierForTest = new StateChangeNotifierForTest(createMockDag(tezDAGID, createMockVertex));
        VertexStateUpdateListener vertexStateUpdateListener = (VertexStateUpdateListener) Mockito.mock(VertexStateUpdateListener.class);
        stateChangeNotifierForTest.registerForVertexUpdates(createMockVertex.getName(), null, vertexStateUpdateListener);
        ArrayList<VertexState> newArrayList = Lists.newArrayList(new VertexState[]{VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED, VertexState.KILLED, VertexState.RUNNING, VertexState.SUCCEEDED});
        int i = 0;
        for (VertexState vertexState : newArrayList) {
            if (i == 3) {
                stateChangeNotifierForTest.unregisterForVertexUpdates(createMockVertex.getName(), vertexStateUpdateListener);
            }
            notifyTracker(stateChangeNotifierForTest, createMockVertex, vertexState);
            i++;
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VertexStateUpdate.class);
        ((VertexStateUpdateListener) Mockito.verify(vertexStateUpdateListener, Mockito.times(3))).onStateUpdated((VertexStateUpdate) forClass.capture());
        List allValues = forClass.getAllValues();
        Iterator it = newArrayList.iterator();
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertEquals(it.next(), ((VertexStateUpdate) allValues.get(i2)).getVertexState());
        }
    }

    private DAG createMockDag(TezDAGID tezDAGID, Vertex... vertexArr) {
        DAG dag = (DAG) Mockito.mock(DAG.class);
        ((DAG) Mockito.doReturn(tezDAGID).when(dag)).getID();
        for (Vertex vertex : vertexArr) {
            String name = vertex.getName();
            TezVertexID vertexId = vertex.getVertexId();
            ((DAG) Mockito.doReturn(vertex).when(dag)).getVertex(name);
            ((DAG) Mockito.doReturn(vertex).when(dag)).getVertex(vertexId);
        }
        return dag;
    }

    private Vertex createMockVertex(TezDAGID tezDAGID, int i) {
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, i);
        Vertex vertex = (Vertex) Mockito.mock(Vertex.class);
        ((Vertex) Mockito.doReturn(tezVertexID).when(vertex)).getVertexId();
        ((Vertex) Mockito.doReturn("vertex" + i).when(vertex)).getName();
        return vertex;
    }

    private void notifyTracker(StateChangeNotifier stateChangeNotifier, Vertex vertex, VertexState vertexState) {
        stateChangeNotifier.stateChanged(vertex.getVertexId(), new VertexStateUpdate(vertex.getName(), vertexState));
    }
}
