/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import org.slf4j.event.Level;

public class MRAppBenchmark {
    private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

    public void run(MRApp app) throws Exception {
        GenericTestUtils.setRootLogLevel((Level)Level.WARN);
        long startTime = System.currentTimeMillis();
        Job job = app.submit(new Configuration());
        while (!job.getReport().getJobState().equals((Object)JobState.SUCCEEDED)) {
            this.printStat(job, startTime);
            Thread.sleep(2000L);
        }
        this.printStat(job, startTime);
    }

    private void printStat(Job job, long startTime) throws Exception {
        long currentTime = System.currentTimeMillis();
        Runtime.getRuntime().gc();
        long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
        System.out.println("JobState:" + job.getState() + " CompletedMaps:" + job.getCompletedMaps() + " CompletedReduces:" + job.getCompletedReduces() + " Memory(total-free)(KB):" + mem / 1024L + " ElapsedTime(ms):" + (currentTime - startTime));
    }

    @Test(timeout=60000L)
    public void benchmark1() throws Exception {
        int maps = 100;
        int reduces = 0;
        System.out.println("Running benchmark with maps:" + maps + " reduces:" + reduces);
        this.run(new MRApp(maps, reduces, true, this.getClass().getName(), true){

            @Override
            protected ContainerAllocator createContainerAllocator(ClientService clientService, AppContext context) {
                NoopAMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
                return new RMContainerAllocator(clientService, context, (AMPreemptionPolicy)policy){

                    protected ApplicationMasterProtocol createSchedulerProxy() {
                        return new ApplicationMasterProtocol(){

                            public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws IOException {
                                RegisterApplicationMasterResponse response = (RegisterApplicationMasterResponse)Records.newRecord(RegisterApplicationMasterResponse.class);
                                response.setMaximumResourceCapability(Resource.newInstance((int)10240, (int)1));
                                response.setQueue("queue1");
                                return response;
                            }

                            public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws IOException {
                                FinishApplicationMasterResponse response = (FinishApplicationMasterResponse)Records.newRecord(FinishApplicationMasterResponse.class);
                                return response;
                            }

                            public AllocateResponse allocate(AllocateRequest request) throws IOException {
                                AllocateResponse response = (AllocateResponse)Records.newRecord(AllocateResponse.class);
                                List askList = request.getAskList();
                                ArrayList<Container> containers = new ArrayList<Container>();
                                for (ResourceRequest req : askList) {
                                    if (!ResourceRequest.isAnyLocation((String)req.getResourceName())) continue;
                                    int numContainers = req.getNumContainers();
                                    for (int i = 0; i < numContainers; ++i) {
                                        ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)this.getContext().getApplicationAttemptId(), (long)(request.getResponseId() + i));
                                        containers.add(Container.newInstance((ContainerId)containerId, (NodeId)NodeId.newInstance((String)("host" + containerId.getContainerId()), (int)2345), (String)("host" + containerId.getContainerId() + ":5678"), (Resource)req.getCapability(), (Priority)req.getPriority(), null));
                                    }
                                }
                                response.setAllocatedContainers(containers);
                                response.setResponseId(request.getResponseId() + 1);
                                response.setNumClusterNodes(350);
                                response.setApplicationPriority(Priority.newInstance((int)100));
                                return response;
                            }
                        };
                    }
                };
            }
        });
    }

    @Test(timeout=60000L)
    public void benchmark2() throws Exception {
        int maps = 100;
        int reduces = 50;
        int maxConcurrentRunningTasks = 500;
        System.out.println("Running benchmark with throttled running tasks with maxConcurrentRunningTasks:" + maxConcurrentRunningTasks + " maps:" + maps + " reduces:" + reduces);
        this.run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
    }

    public static void main(String[] args) throws Exception {
        MRAppBenchmark benchmark = new MRAppBenchmark();
        benchmark.benchmark1();
        benchmark.benchmark2();
    }

    static class ThrottledMRApp
    extends MRApp {
        int maxConcurrentRunningTasks;
        volatile int concurrentRunningTasks;

        ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
            super(maps, reduces, true, "ThrottledMRApp", true);
            this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
        }

        @Override
        protected void attemptLaunched(TaskAttemptId attemptID) {
            super.attemptLaunched(attemptID);
            --this.concurrentRunningTasks;
        }

        @Override
        protected ContainerAllocator createContainerAllocator(ClientService clientService, AppContext context) {
            return new ThrottledContainerAllocator();
        }

        class ThrottledContainerAllocator
        extends AbstractService
        implements ContainerAllocator,
        RMHeartbeatHandler {
            private int containerCount;
            private Thread thread;
            private BlockingQueue<ContainerAllocatorEvent> eventQueue;

            public ThrottledContainerAllocator() {
                super("ThrottledContainerAllocator");
                this.eventQueue = new LinkedBlockingQueue<ContainerAllocatorEvent>();
            }

            public void handle(ContainerAllocatorEvent event) {
                try {
                    this.eventQueue.put(event);
                }
                catch (InterruptedException e) {
                    throw new YarnRuntimeException((Throwable)e);
                }
            }

            protected void serviceStart() throws Exception {
                this.thread = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        ContainerAllocatorEvent event = null;
                        while (!Thread.currentThread().isInterrupted()) {
                            try {
                                if (ThrottledMRApp.this.concurrentRunningTasks < ThrottledMRApp.this.maxConcurrentRunningTasks) {
                                    event = (ContainerAllocatorEvent)ThrottledContainerAllocator.this.eventQueue.take();
                                    ContainerId cId = ContainerId.newContainerId((ApplicationAttemptId)ThrottledMRApp.this.getContext().getApplicationAttemptId(), (long)ThrottledContainerAllocator.this.containerCount++);
                                    Container container = (Container)recordFactory.newRecordInstance(Container.class);
                                    container.setId(cId);
                                    NodeId nodeId = NodeId.newInstance((String)"dummy", (int)1234);
                                    container.setNodeId(nodeId);
                                    container.setContainerToken(null);
                                    container.setNodeHttpAddress("localhost:8042");
                                    ThrottledMRApp.this.getContext().getEventHandler().handle((Event)new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null));
                                    ++ThrottledMRApp.this.concurrentRunningTasks;
                                    continue;
                                }
                                Thread.sleep(1000L);
                            }
                            catch (InterruptedException e) {
                                System.out.println("Returning, interrupted");
                                return;
                            }
                        }
                    }
                });
                this.thread.start();
                super.serviceStart();
            }

            protected void serviceStop() throws Exception {
                if (this.thread != null) {
                    this.thread.interrupt();
                }
                super.serviceStop();
            }

            public long getLastHeartbeatTime() {
                return Time.now();
            }

            public void runOnNextHeartbeat(Runnable callback) {
            }
        }
    }
}

