/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster;
import org.apache.tez.dag.app.MockTezClient;
import org.apache.tez.dag.app.TestMockDAGAppMaster;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.util.StopWatch;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class TestMemoryWithEvents {
    static Configuration defaultConf;
    static FileSystem localFs;
    final int numThreads = 30;
    final int numTasks = 10000;

    private void checkMemory(String name, MockDAGAppMaster mockApp) {
        long mb = 0x100000L;
        long microsPerMs = 1000L;
        Runtime runtime = Runtime.getRuntime();
        System.out.println("##### Heap utilization statistics [MB] for " + name);
        runtime.gc();
        System.out.println("##### Used Memory:" + (runtime.totalMemory() - runtime.freeMemory()) / mb);
        System.out.println("##### Free Memory:" + runtime.freeMemory() / mb);
        System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);
        System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);
        long numHeartbeats = mockApp.numHearbeats.get();
        if (numHeartbeats == 0L) {
            numHeartbeats = 1L;
        }
        System.out.println("##### Heartbeat (ms) : latency avg: " + mockApp.heartbeatTime.get() / numHeartbeats / microsPerMs + " cpu total: " + mockApp.heartbeatCpu.get() / microsPerMs + " cpu avg: " + mockApp.heartbeatCpu.get() / numHeartbeats / microsPerMs + " numHeartbeats: " + mockApp.numHearbeats.get());
    }

    private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
        StopWatch stopwatch = new StopWatch();
        stopwatch.start();
        TezConfiguration tezconf = new TezConfiguration(defaultConf);
        MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null, false, false, 30, 1000);
        tezClient.start();
        MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
        MockDAGAppMaster.MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
        mockLauncher.startScheduling(false);
        mockApp.eventsDelegate = new TestMockDAGAppMaster.TestEventsDelegate();
        mockApp.doSleep = false;
        DAGClient dagClient = tezClient.submitDAG(dag);
        mockLauncher.waitTillContainersLaunched();
        mockLauncher.startScheduling(true);
        DAGStatus status = dagClient.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)status.getState());
        this.checkMemory(dag.getName(), mockApp);
        stopwatch.stop();
        System.out.println("Time taken(ms): " + stopwatch.now(TimeUnit.MILLISECONDS));
        tezClient.stop();
    }

    @Ignore
    @Test(timeout=600000L)
    public void testMemoryRootInputEvents() throws Exception {
        DAG dag = DAG.create((String)"testMemoryRootInputEvents");
        Vertex vA = Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        Vertex vB = Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        vA.addDataSource("Input", DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)"In"), (InputInitializerDescriptor)InputInitializerDescriptor.create((String)SimulationInitializer.class.getName()), null));
        dag.addVertex(vA).addVertex(vB);
        this.testMemory(dag, false);
    }

    @Ignore
    @Test(timeout=600000L)
    public void testMemoryOneToOne() throws Exception {
        DAG dag = DAG.create((String)"testMemoryOneToOne");
        Vertex vA = Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        Vertex vB = Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((Vertex)vA, (Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.ONE_TO_ONE, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        this.testMemory(dag, true);
    }

    @Ignore
    @Test(timeout=600000L)
    public void testMemoryBroadcast() throws Exception {
        DAG dag = DAG.create((String)"testMemoryBroadcast");
        Vertex vA = Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        Vertex vB = Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((Vertex)vA, (Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        this.testMemory(dag, true);
    }

    @Ignore
    @Test(timeout=600000L)
    public void testMemoryScatterGather() throws Exception {
        DAG dag = DAG.create((String)"testMemoryScatterGather");
        Vertex vA = Vertex.create((String)"A", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        Vertex vB = Vertex.create((String)"B", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Proc.class"), (int)10000);
        dag.addVertex(vA).addVertex(vB).addEdge(Edge.create((Vertex)vA, (Vertex)vB, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"))));
        this.testMemory(dag, true);
    }

    static {
        try {
            defaultConf = new Configuration(false);
            defaultConf.set("fs.defaultFS", "file:///");
            defaultConf.setBoolean("tez.local.mode", true);
            localFs = FileSystem.getLocal((Configuration)defaultConf);
            String stagingDir = "target/" + TestMemoryWithEvents.class.getName() + "-tmpDir";
            defaultConf.set("tez.staging-dir", stagingDir);
            Logger.getRootLogger().setLevel(Level.WARN);
        }
        catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }

    public static class SimulationInitializer
    extends InputInitializer {
        public SimulationInitializer(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<Event> initialize() throws Exception {
            int numTasks = this.getContext().getNumTasks();
            ArrayList events = Lists.newArrayListWithCapacity((int)numTasks);
            for (int i = 0; i < numTasks; ++i) {
                events.add(InputDataInformationEvent.createWithSerializedPayload((int)i, null));
            }
            return events;
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
        }
    }
}

