package org.apache.tez.dag.app;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager1.class */
public class TestTaskCommunicatorManager1 {
    private ApplicationId appId;
    private ApplicationAttemptId appAttemptId;
    private AppContext appContext;
    Credentials credentials;
    AMContainerMap amContainerMap;
    EventHandler eventHandler;
    DAG dag;
    TaskCommunicatorManager taskAttemptListener;
    ContainerTask containerTask;
    AMContainerTask amContainerTask;
    TaskSpec taskSpec;
    TezVertexID vertexID;
    TezTaskID taskID;
    TezTaskAttemptID taskAttemptID;

    /* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager1$TaskCommunicatorManagerInterfaceImplForTest.class */
    private static class TaskCommunicatorManagerInterfaceImplForTest extends TaskCommunicatorManager {
        public TaskCommunicatorManagerInterfaceImplForTest(AppContext appContext, TaskHeartbeatHandler taskHeartbeatHandler, ContainerHeartbeatHandler containerHeartbeatHandler, List<NamedEntityDescriptor> list) throws TezException {
            super(appContext, taskHeartbeatHandler, containerHeartbeatHandler, list);
        }

        TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
            return new TezTaskCommunicatorImplForTest(taskCommunicatorContext);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TestTaskCommunicatorManager1$TezTaskCommunicatorImplForTest.class */
    private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
        public TezTaskCommunicatorImplForTest(TaskCommunicatorContext taskCommunicatorContext) {
            super(taskCommunicatorContext);
        }

        protected void startRpcServer() {
        }

        protected void stopRpcServer() {
        }
    }

    @Before
    public void setUp() throws TezException {
        this.appId = ApplicationId.newInstance(1000L, 1);
        this.appAttemptId = ApplicationAttemptId.newInstance(this.appId, 1);
        this.dag = (DAG) Mockito.mock(DAG.class);
        this.vertexID = TezVertexID.getInstance(TezDAGID.getInstance(this.appId, 1), 1);
        this.taskID = TezTaskID.getInstance(this.vertexID, 1);
        this.taskAttemptID = TezTaskAttemptID.getInstance(this.taskID, 1);
        this.credentials = new Credentials();
        this.amContainerMap = (AMContainerMap) Mockito.mock(AMContainerMap.class);
        HashMap hashMap = new HashMap();
        this.eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        MockClock mockClock = new MockClock();
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        ((AppContext) Mockito.doReturn(this.eventHandler).when(this.appContext)).getEventHandler();
        ((AppContext) Mockito.doReturn(this.dag).when(this.appContext)).getCurrentDAG();
        ((AppContext) Mockito.doReturn(hashMap).when(this.appContext)).getApplicationACLs();
        ((AppContext) Mockito.doReturn(this.amContainerMap).when(this.appContext)).getAllContainers();
        ((AppContext) Mockito.doReturn(mockClock).when(this.appContext)).getClock();
        ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.appContext)).getApplicationAttemptId();
        ((AppContext) Mockito.doReturn(this.credentials).when(this.appContext)).getAppCredentials();
        NodeId newInstance = NodeId.newInstance("localhost", 0);
        AMContainer aMContainer = (AMContainer) Mockito.mock(AMContainer.class);
        Container container = (Container) Mockito.mock(Container.class);
        ((Container) Mockito.doReturn(newInstance).when(container)).getNodeId();
        ((AMContainerMap) Mockito.doReturn(aMContainer).when(this.amContainerMap)).get((ContainerId) Matchers.any(ContainerId.class));
        ((AMContainer) Mockito.doReturn(container).when(aMContainer)).getContainer();
        try {
            this.taskAttemptListener = new TaskCommunicatorManagerInterfaceImplForTest(this.appContext, (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor[]{new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), (String) null).setUserPayload(TezUtils.createUserPayloadFromConf(new TezConfiguration()))}));
            this.taskSpec = (TaskSpec) Mockito.mock(TaskSpec.class);
            ((TaskSpec) Mockito.doReturn(this.taskAttemptID).when(this.taskSpec)).getTaskAttemptID();
            this.amContainerTask = new AMContainerTask(this.taskSpec, (Map) null, (Credentials) null, false, 0);
            this.containerTask = null;
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    @Test(timeout = 5000)
    public void testGetTask() throws IOException {
        TezTaskUmbilicalProtocol umbilical = this.taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator().getUmbilical();
        this.containerTask = umbilical.getTask(new ContainerContext(createContainerId(this.appId, 1).toString()));
        Assert.assertTrue(this.containerTask.shouldDie());
        ContainerId createContainerId = createContainerId(this.appId, 2);
        ContainerContext containerContext = new ContainerContext(createContainerId.toString());
        this.taskAttemptListener.registerRunningContainer(createContainerId, 0);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertNull(this.containerTask);
        this.taskAttemptListener.registerTaskAttempt(this.amContainerTask, createContainerId, 0);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertFalse(this.containerTask.shouldDie());
        Assert.assertEquals(this.taskSpec, this.containerTask.getTaskSpec());
        this.taskAttemptListener.unregisterTaskAttempt(this.taskAttemptID, 0, TaskAttemptEndReason.OTHER, (String) null);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertNull(this.containerTask);
        this.taskAttemptListener.unregisterRunningContainer(createContainerId, 0, ContainerEndReason.OTHER, (String) null);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertTrue(this.containerTask.shouldDie());
        ContainerId createContainerId2 = createContainerId(this.appId, 3);
        ContainerContext containerContext2 = new ContainerContext(createContainerId2.toString());
        this.taskAttemptListener.registerRunningContainer(createContainerId2, 0);
        ((TaskSpec) Mockito.doReturn((TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class)).when((TaskSpec) Mockito.mock(TaskSpec.class))).getTaskAttemptID();
        this.taskAttemptListener.registerTaskAttempt(new AMContainerTask(this.taskSpec, (Map) null, (Credentials) null, false, 0), createContainerId2, 0);
        this.taskAttemptListener.unregisterRunningContainer(createContainerId2, 0, ContainerEndReason.OTHER, (String) null);
        this.containerTask = umbilical.getTask(containerContext2);
        Assert.assertTrue(this.containerTask.shouldDie());
    }

    @Test(timeout = 5000)
    public void testGetTaskMultiplePulls() throws IOException {
        TezTaskUmbilicalProtocol umbilical = this.taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator().getUmbilical();
        ContainerId createContainerId = createContainerId(this.appId, 1);
        ContainerContext containerContext = new ContainerContext(createContainerId.toString());
        this.taskAttemptListener.registerRunningContainer(createContainerId, 0);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertNull(this.containerTask);
        this.taskAttemptListener.registerTaskAttempt(this.amContainerTask, createContainerId, 0);
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertFalse(this.containerTask.shouldDie());
        Assert.assertEquals(this.taskSpec, this.containerTask.getTaskSpec());
        this.containerTask = umbilical.getTask(containerContext);
        Assert.assertNull(this.containerTask);
    }

    @Test(timeout = 5000)
    public void testTaskEventRouting() throws Exception {
        generateHeartbeat(Arrays.asList(new TezEvent(new TaskStatusUpdateEvent((TezCounters) null, 0.0f, (TaskStatistics) null, false), new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "v1", "v2", this.taskAttemptID)), new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "v1", "v2", this.taskAttemptID)), new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, "v1", "v2", this.taskAttemptID))), 0, 1, 0, new ArrayList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ((EventHandler) Mockito.verify(this.eventHandler, Mockito.times(4))).handle((Event) forClass.capture());
        List allValues = forClass.getAllValues();
        TaskAttemptEventStatusUpdate taskAttemptEventStatusUpdate = (Event) allValues.get(0);
        Assert.assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, taskAttemptEventStatusUpdate.getType());
        Assert.assertEquals(false, Boolean.valueOf(taskAttemptEventStatusUpdate.getReadErrorReported()));
        TaskAttemptEventTezEventUpdate taskAttemptEventTezEventUpdate = (TaskAttemptEventTezEventUpdate) allValues.get(1);
        Assert.assertEquals(1L, taskAttemptEventTezEventUpdate.getTezEvents().size());
        Assert.assertEquals(EventType.DATA_MOVEMENT_EVENT, ((TezEvent) taskAttemptEventTezEventUpdate.getTezEvents().get(0)).getEventType());
        Assert.assertEquals(TaskAttemptEventType.TA_DONE, ((TaskAttemptEvent) allValues.get(2)).getType());
        VertexEventRouteEvent vertexEventRouteEvent = (VertexEventRouteEvent) allValues.get(3);
        Assert.assertEquals(1L, vertexEventRouteEvent.getEvents().size());
        Assert.assertEquals(EventType.DATA_MOVEMENT_EVENT, ((TezEvent) vertexEventRouteEvent.getEvents().get(0)).getEventType());
    }

    @Test(timeout = 5000)
    public void testTaskEventRoutingWithReadError() throws Exception {
        generateHeartbeat(Arrays.asList(new TezEvent(new TaskStatusUpdateEvent((TezCounters) null, 0.0f, (TaskStatistics) null, false), (EventMetaData) null), new TezEvent(InputReadErrorEvent.create("", 0, 0), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "v2", "v1", this.taskAttemptID)), new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, "v1", "v2", this.taskAttemptID))), 0, 1, 0, new ArrayList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ((EventHandler) Mockito.verify(this.eventHandler, Mockito.times(3))).handle((Event) forClass.capture());
        List allValues = forClass.getAllValues();
        TaskAttemptEventStatusUpdate taskAttemptEventStatusUpdate = (Event) allValues.get(0);
        Assert.assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE, taskAttemptEventStatusUpdate.getType());
        Assert.assertEquals(true, Boolean.valueOf(taskAttemptEventStatusUpdate.getReadErrorReported()));
        Assert.assertEquals("Second event should be TA_DONE", TaskAttemptEventType.TA_DONE, ((Event) allValues.get(1)).getType());
        VertexEventRouteEvent vertexEventRouteEvent = (Event) allValues.get(2);
        Assert.assertEquals("Third event should be routed to vertex", VertexEventType.V_ROUTE_EVENT, vertexEventRouteEvent.getType());
        Assert.assertEquals(EventType.INPUT_READ_ERROR_EVENT, ((TezEvent) vertexEventRouteEvent.getEvents().get(0)).getEventType());
    }

    @Test(timeout = 5000)
    public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
        generateHeartbeat(Arrays.asList(new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, "v1", "v2", this.taskAttemptID))), 0, 1, 0, new ArrayList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ((EventHandler) Mockito.verify(this.eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
        Assert.assertEquals("only event should be route event", TaskAttemptEventType.TA_DONE, ((Event) forClass.getAllValues().get(0)).getType());
    }

    @Test(timeout = 5000)
    public void testTaskHeartbeatResponse() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        TaskHeartbeatResponse generateHeartbeat = generateHeartbeat(arrayList, 0, 1, 2, arrayList2);
        Assert.assertEquals(2L, generateHeartbeat.getNextFromEventId());
        Assert.assertEquals(arrayList2, generateHeartbeat.getEvents());
    }

    @Test(timeout = 5000)
    public void testPortRange() {
        boolean z = false;
        Random random = new Random();
        int i = 0;
        while (true) {
            if (i >= 10) {
                break;
            }
            if (testPortRange(1024 + random.nextInt(64511))) {
                z = true;
                break;
            }
            i++;
        }
        if (z) {
            return;
        }
        Assert.fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
    }

    @Test(timeout = 5000)
    public void testPortRange_NotSpecified() throws IOException, TezException {
        Configuration configuration = new Configuration();
        JobTokenIdentifier jobTokenIdentifier = new JobTokenIdentifier(new Text("fakeIdentifier"));
        Token token = new Token(jobTokenIdentifier, new JobTokenSecretManager());
        token.setService(jobTokenIdentifier.getJobId());
        TokenCache.setSessionToken(token, this.credentials);
        this.taskAttemptListener = new TaskCommunicatorManager(this.appContext, (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor[]{new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), (String) null).setUserPayload(TezUtils.createUserPayloadFromConf(configuration))}));
        this.taskAttemptListener.init(configuration);
        this.taskAttemptListener.start();
    }

    private boolean testPortRange(int i) {
        boolean z = true;
        try {
            Configuration configuration = new Configuration();
            JobTokenIdentifier jobTokenIdentifier = new JobTokenIdentifier(new Text("fakeIdentifier"));
            Token token = new Token(jobTokenIdentifier, new JobTokenSecretManager());
            token.setService(jobTokenIdentifier.getJobId());
            TokenCache.setSessionToken(token, this.credentials);
            configuration.set("tez.am.task.am.port-range", i + "-" + i);
            this.taskAttemptListener = new TaskCommunicatorManager(this.appContext, (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), (ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor[]{new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), (String) null).setUserPayload(TezUtils.createUserPayloadFromConf(configuration))}));
            this.taskAttemptListener.init(configuration);
            this.taskAttemptListener.start();
            Assert.assertEquals(i, this.taskAttemptListener.getTaskCommunicator(0).getAddress().getPort());
            if (this.taskAttemptListener != null) {
                try {
                    this.taskAttemptListener.close();
                } catch (IOException e) {
                    e.printStackTrace();
                    Assert.fail("fail to stop TaskAttemptListener");
                }
            }
        } catch (Exception e2) {
            z = false;
            if (this.taskAttemptListener != null) {
                try {
                    this.taskAttemptListener.close();
                } catch (IOException e3) {
                    e3.printStackTrace();
                    Assert.fail("fail to stop TaskAttemptListener");
                }
            }
        } catch (Throwable th) {
            if (this.taskAttemptListener != null) {
                try {
                    this.taskAttemptListener.close();
                } catch (IOException e4) {
                    e4.printStackTrace();
                    Assert.fail("fail to stop TaskAttemptListener");
                }
            }
            throw th;
        }
        return z;
    }

    private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> list, int i, int i2, int i3, List<TezEvent> list2) throws IOException, TezException {
        ContainerId createContainerId = createContainerId(this.appId, 1);
        Vertex vertex = (Vertex) Mockito.mock(Vertex.class);
        ((DAG) Mockito.doReturn(vertex).when(this.dag)).getVertex(this.vertexID);
        ((Vertex) Mockito.doReturn("test_vertex").when(vertex)).getName();
        ((Vertex) Mockito.doReturn(new TaskAttemptEventInfo(i3, list2, 0)).when(vertex)).getTaskAttemptTezEvents(this.taskAttemptID, i, 0, i2);
        this.taskAttemptListener.registerRunningContainer(createContainerId, 0);
        this.taskAttemptListener.registerTaskAttempt(this.amContainerTask, createContainerId, 0);
        TaskHeartbeatRequest taskHeartbeatRequest = (TaskHeartbeatRequest) Mockito.mock(TaskHeartbeatRequest.class);
        ((TaskHeartbeatRequest) Mockito.doReturn(createContainerId.toString()).when(taskHeartbeatRequest)).getContainerIdentifier();
        ((TaskHeartbeatRequest) Mockito.doReturn(createContainerId.toString()).when(taskHeartbeatRequest)).getContainerIdentifier();
        ((TaskHeartbeatRequest) Mockito.doReturn(this.taskAttemptID).when(taskHeartbeatRequest)).getTaskAttemptId();
        ((TaskHeartbeatRequest) Mockito.doReturn(list).when(taskHeartbeatRequest)).getEvents();
        ((TaskHeartbeatRequest) Mockito.doReturn(Integer.valueOf(i2)).when(taskHeartbeatRequest)).getMaxEvents();
        ((TaskHeartbeatRequest) Mockito.doReturn(Integer.valueOf(i)).when(taskHeartbeatRequest)).getStartIndex();
        return this.taskAttemptListener.heartbeat(taskHeartbeatRequest);
    }

    private ContainerId createContainerId(ApplicationId applicationId, int i) {
        return ContainerId.newInstance(ApplicationAttemptId.newInstance(applicationId, 1), i);
    }
}
