package org.apache.tez.dag.app;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.TestMockDAGAppMaster;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.util.StopWatch;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/dag/app/TestMemoryWithEvents.class */
public class TestMemoryWithEvents {
    static Configuration defaultConf;
    static FileSystem localFs;
    final int numThreads = 30;
    final int numTasks = 10000;

    /* loaded from: input_file:org/apache/tez/dag/app/TestMemoryWithEvents$SimulationInitializer.class */
    public static class SimulationInitializer extends InputInitializer {
        public SimulationInitializer(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
        }

        public List<Event> initialize() throws Exception {
            int numTasks = getContext().getNumTasks();
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(numTasks);
            for (int i = 0; i < numTasks; i++) {
                newArrayListWithCapacity.add(InputDataInformationEvent.createWithSerializedPayload(i, (ByteBuffer) null));
            }
            return newArrayListWithCapacity;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
        }
    }

    private void checkMemory(String str, MockDAGAppMaster mockDAGAppMaster) {
        Runtime runtime = Runtime.getRuntime();
        System.out.println("##### Heap utilization statistics [MB] for " + str);
        runtime.gc();
        System.out.println("##### Used Memory:" + ((runtime.totalMemory() - runtime.freeMemory()) / 1048576));
        System.out.println("##### Free Memory:" + (runtime.freeMemory() / 1048576));
        System.out.println("##### Total Memory:" + (runtime.totalMemory() / 1048576));
        System.out.println("##### Max Memory:" + (runtime.maxMemory() / 1048576));
        long j = mockDAGAppMaster.numHearbeats.get();
        if (j == 0) {
            j = 1;
        }
        PrintStream printStream = System.out;
        long j2 = (mockDAGAppMaster.heartbeatTime.get() / j) / 1000;
        long j3 = mockDAGAppMaster.heartbeatCpu.get() / 1000;
        long j4 = (mockDAGAppMaster.heartbeatCpu.get() / j) / 1000;
        mockDAGAppMaster.numHearbeats.get();
        printStream.println("##### Heartbeat (ms) : latency avg: " + j2 + " cpu total: " + printStream + " cpu avg: " + j3 + " numHeartbeats: " + printStream);
    }

    private void testMemory(DAG dag, boolean z) throws Exception {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        MockTezClient mockTezClient = new MockTezClient("testMockAM", new TezConfiguration(defaultConf), true, null, null, null, null, false, false, 30, 1000);
        mockTezClient.start();
        MockDAGAppMaster mockApp = mockTezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher containerLauncher = mockApp.getContainerLauncher();
        containerLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestMockDAGAppMaster.TestEventsDelegate();
        mockApp.doSleep = false;
        DAGClient submitDAG = mockTezClient.submitDAG(dag);
        containerLauncher.waitTillContainersLaunched();
        containerLauncher.startScheduling(true);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.waitForCompletion().getState());
        checkMemory(dag.getName(), mockApp);
        stopWatch.stop();
        System.out.println("Time taken(ms): " + stopWatch.now(TimeUnit.MILLISECONDS));
        mockTezClient.stop();
    }

    @Test(timeout = 600000)
    @Ignore
    public void testMemoryRootInputEvents() throws Exception {
        DAG create = DAG.create("testMemoryRootInputEvents");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10000);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 10000);
        create2.addDataSource("Input", DataSourceDescriptor.create(InputDescriptor.create("In"), InputInitializerDescriptor.create(SimulationInitializer.class.getName()), (Credentials) null));
        create.addVertex(create2).addVertex(create3);
        testMemory(create, false);
    }

    @Test(timeout = 600000)
    @Ignore
    public void testMemoryOneToOne() throws Exception {
        DAG create = DAG.create("testMemoryOneToOne");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10000);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 10000);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        testMemory(create, true);
    }

    @Test(timeout = 600000)
    @Ignore
    public void testMemoryBroadcast() throws Exception {
        DAG create = DAG.create("testMemoryBroadcast");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10000);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 10000);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        testMemory(create, true);
    }

    @Test(timeout = 600000)
    @Ignore
    public void testMemoryScatterGather() throws Exception {
        DAG create = DAG.create("testMemoryScatterGather");
        Vertex create2 = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 10000);
        Vertex create3 = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 10000);
        create.addVertex(create2).addVertex(create3).addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        testMemory(create, true);
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal(defaultConf);
            defaultConf.set("tez.staging-dir", "target/" + TestMemoryWithEvents.class.getName() + "-tmpDir");
            Logger.getRootLogger().setLevel(Level.WARN);
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
