package org.apache.tez.dag.history.logging.ats;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.common.security.HistoryACLPolicyManager;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.class */
public class TestATSHistoryLoggingService {
    private ATSHistoryLoggingService atsHistoryLoggingService;
    private AppContext appContext;
    private Configuration conf;
    private int atsInvokeCounter;
    private int atsEntitiesCounter;
    private HistoryACLPolicyManager historyACLPolicyManager;
    private SystemClock clock = new SystemClock();
    private static final Logger LOG = LoggerFactory.getLogger(TestATSHistoryLoggingService.class);
    private static ApplicationId appId = ApplicationId.newInstance(1000, 1);
    private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);

    @Before
    public void setup() throws Exception {
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        this.historyACLPolicyManager = (HistoryACLPolicyManager) Mockito.mock(HistoryACLPolicyManager.class);
        this.atsHistoryLoggingService = new ATSHistoryLoggingService();
        this.atsHistoryLoggingService.setAppContext(this.appContext);
        this.conf = new Configuration(false);
        this.conf.setLong("tez.yarn.ats.event.flush.timeout.millis", 1000L);
        this.conf.setInt("tez.yarn.ats.max.events.per.batch", 2);
        this.conf.setBoolean("tez.allow.disabled.timeline-domains", true);
        this.conf.set("tez.yarn.ats.acl.session.domain.id", "test-domain");
        this.atsInvokeCounter = 0;
        this.atsEntitiesCounter = 0;
        this.atsHistoryLoggingService.init(this.conf);
        this.atsHistoryLoggingService.historyACLPolicyManager = this.historyACLPolicyManager;
        this.atsHistoryLoggingService.timelineClient = (TimelineClient) Mockito.mock(TimelineClient.class);
        Mockito.when(this.appContext.getClock()).thenReturn(this.clock);
        Mockito.when(this.appContext.getCurrentDAGID()).thenReturn((Object) null);
        Mockito.when(this.appContext.getApplicationID()).thenReturn(appId);
        Mockito.when(this.atsHistoryLoggingService.timelineClient.putEntities((TimelineEntity[]) Matchers.anyVararg())).thenAnswer(new Answer<Object>() { // from class: org.apache.tez.dag.history.logging.ats.TestATSHistoryLoggingService.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                TestATSHistoryLoggingService.access$004(TestATSHistoryLoggingService.this);
                TestATSHistoryLoggingService.access$112(TestATSHistoryLoggingService.this, invocationOnMock.getArguments().length);
                try {
                    Thread.sleep(500L);
                    return null;
                } catch (InterruptedException e) {
                    return null;
                }
            }
        });
    }

    @After
    public void teardown() {
        this.atsHistoryLoggingService.stop();
        this.atsHistoryLoggingService = null;
    }

    @Test(timeout = 20000)
    public void testATSHistoryLoggingServiceShutdown() {
        this.atsHistoryLoggingService.start();
        TezDAGID tezDAGID = TezDAGID.getInstance(ApplicationId.newInstance(100L, 1), 1);
        DAGHistoryEvent dAGHistoryEvent = new DAGHistoryEvent(tezDAGID, new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; i++) {
            this.atsHistoryLoggingService.handle(dAGHistoryEvent);
        }
        try {
            Thread.sleep(2500L);
        } catch (InterruptedException e) {
        }
        this.atsHistoryLoggingService.stop();
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertTrue(this.atsEntitiesCounter >= 4);
        Assert.assertTrue(this.atsEntitiesCounter < 20);
    }

    @Test(timeout = 20000)
    public void testATSEventBatching() {
        this.atsHistoryLoggingService.start();
        TezDAGID tezDAGID = TezDAGID.getInstance(ApplicationId.newInstance(100L, 1), 1);
        DAGHistoryEvent dAGHistoryEvent = new DAGHistoryEvent(tezDAGID, new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; i++) {
            this.atsHistoryLoggingService.handle(dAGHistoryEvent);
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertTrue(this.atsEntitiesCounter > this.atsInvokeCounter);
        Assert.assertEquals(this.atsEntitiesCounter / 2, this.atsInvokeCounter);
    }

    @Test(timeout = 20000)
    public void testTimelineServiceDisable() throws Exception {
        this.atsHistoryLoggingService.start();
        ATSHistoryLoggingService aTSHistoryLoggingService = new ATSHistoryLoggingService();
        aTSHistoryLoggingService.setAppContext(this.appContext);
        aTSHistoryLoggingService.timelineClient = (TimelineClient) Mockito.mock(TimelineClient.class);
        Mockito.when(aTSHistoryLoggingService.timelineClient.putEntities((TimelineEntity[]) Matchers.anyVararg())).thenAnswer(new Answer<Object>() { // from class: org.apache.tez.dag.history.logging.ats.TestATSHistoryLoggingService.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                TestATSHistoryLoggingService.access$004(TestATSHistoryLoggingService.this);
                TestATSHistoryLoggingService.access$112(TestATSHistoryLoggingService.this, invocationOnMock.getArguments().length);
                try {
                    Thread.sleep(10L);
                    return null;
                } catch (InterruptedException e) {
                    return null;
                }
            }
        });
        this.conf.setBoolean("yarn.timeline-service.enabled", false);
        this.conf.set("tez.history.logging.service.class", ATSHistoryLoggingService.class.getName());
        aTSHistoryLoggingService.init(this.conf);
        aTSHistoryLoggingService.start();
        TezDAGID tezDAGID = TezDAGID.getInstance(ApplicationId.newInstance(100L, 1), 1);
        DAGHistoryEvent dAGHistoryEvent = new DAGHistoryEvent(tezDAGID, new DAGStartedEvent(tezDAGID, 1001L, "user1", "dagName1"));
        for (int i = 0; i < 100; i++) {
            aTSHistoryLoggingService.handle(dAGHistoryEvent);
        }
        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
        }
        LOG.info("ATS entitiesSent=" + this.atsEntitiesCounter + ", timelineInvocations=" + this.atsInvokeCounter);
        Assert.assertEquals(this.atsInvokeCounter, 0L);
        Assert.assertEquals(this.atsEntitiesCounter, 0L);
        Assert.assertNull(aTSHistoryLoggingService.timelineClient);
        aTSHistoryLoggingService.close();
    }

    @Test(timeout = 10000)
    public void testNonSessionDomains() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "session-id"));
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(6))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("session-id"));
    }

    @Test(timeout = 10000)
    public void testNonSessionDomainsFailed() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenThrow(new Throwable[]{new IOException()});
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("session-id"));
        Assert.assertEquals(0L, this.atsEntitiesCounter);
    }

    @Test(timeout = 10000)
    public void testNonSessionDomainsAclNull() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenReturn((Object) null);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("session-id"));
        Assert.assertEquals(6L, this.atsEntitiesCounter);
    }

    @Test(timeout = 10000)
    public void testSessionDomains() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "test-domain"));
        Mockito.when(this.historyACLPolicyManager.setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any(), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.dag.domain.id", "dag-domain"));
        Mockito.when(Boolean.valueOf(this.appContext.isSession())).thenReturn(true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("test-domain"));
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(5))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("dag-domain"));
    }

    @Test(timeout = 10000)
    public void testSessionDomainsFailed() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenThrow(new Throwable[]{new IOException()});
        Mockito.when(this.historyACLPolicyManager.setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any(), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.dag.domain.id", "dag-domain"));
        Mockito.when(Boolean.valueOf(this.appContext.isSession())).thenReturn(true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(1000L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.any());
        Assert.assertEquals(0L, this.atsEntitiesCounter);
    }

    @Test(timeout = 10000)
    public void testSessionDomainsDagFailed() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenReturn(Collections.singletonMap("tez.yarn.ats.acl.session.domain.id", "session-domain"));
        Mockito.when(this.historyACLPolicyManager.setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any(), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any())).thenThrow(new Throwable[]{new IOException()});
        Mockito.when(Boolean.valueOf(this.appContext.isSession())).thenReturn(true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.eq("session-domain"));
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.any());
        Assert.assertEquals(1L, this.atsEntitiesCounter);
    }

    @Test(timeout = 10000)
    public void testSessionDomainsAclNull() throws Exception {
        Mockito.when(this.historyACLPolicyManager.setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any())).thenReturn((Object) null);
        Mockito.when(this.historyACLPolicyManager.setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any(), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(this.appContext.isSession())).thenReturn(true);
        this.atsHistoryLoggingService.start();
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.any());
        Iterator<DAGHistoryEvent> it = makeHistoryEvents(TezDAGID.getInstance(appId, 0), this.atsHistoryLoggingService).iterator();
        while (it.hasNext()) {
            this.atsHistoryLoggingService.handle(it.next());
        }
        Thread.sleep(2500L);
        while (!this.atsHistoryLoggingService.eventQueue.isEmpty()) {
            Thread.sleep(100L);
        }
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(1))).setupSessionDAGACLs((Configuration) Matchers.any(), (ApplicationId) Matchers.eq(appId), (String) Matchers.eq("0"), (DAGAccessControls) Matchers.any());
        ((HistoryACLPolicyManager) Mockito.verify(this.historyACLPolicyManager, Mockito.times(0))).updateTimelineEntityDomain(Matchers.any(), (String) Matchers.any());
        Assert.assertEquals(6L, this.atsEntitiesCounter);
    }

    private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID tezDAGID, ATSHistoryLoggingService aTSHistoryLoggingService) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        Configuration configuration = new Configuration(aTSHistoryLoggingService.getConfig());
        arrayList.add(new DAGHistoryEvent((TezDAGID) null, new AMStartedEvent(attemptId, currentTimeMillis, "user")));
        arrayList.add(new DAGHistoryEvent(tezDAGID, new DAGSubmittedEvent(tezDAGID, currentTimeMillis, DAGProtos.DAGPlan.getDefaultInstance(), attemptId, (Map) null, "user", configuration, (String) null, "default")));
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 1);
        arrayList.add(new DAGHistoryEvent(tezDAGID, new VertexStartedEvent(tezVertexID, currentTimeMillis, currentTimeMillis)));
        TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, 1);
        arrayList.add(new DAGHistoryEvent(tezDAGID, new TaskStartedEvent(tezTaskID, "test", currentTimeMillis, currentTimeMillis)));
        arrayList.add(new DAGHistoryEvent(tezDAGID, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", currentTimeMillis, ContainerId.newContainerId(attemptId, 1L), NodeId.newInstance("localhost", 8765), (String) null, (String) null, (String) null)));
        return arrayList;
    }

    static /* synthetic */ int access$004(TestATSHistoryLoggingService testATSHistoryLoggingService) {
        int i = testATSHistoryLoggingService.atsInvokeCounter + 1;
        testATSHistoryLoggingService.atsInvokeCounter = i;
        return i;
    }

    static /* synthetic */ int access$112(TestATSHistoryLoggingService testATSHistoryLoggingService, int i) {
        int i2 = testATSHistoryLoggingService.atsEntitiesCounter + i;
        testATSHistoryLoggingService.atsEntitiesCounter = i2;
        return i2;
    }
}
