/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.tests;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.apache.tez.tests.TestExternalTezServices;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestExtServicesWithLocalMode {
    private static final Logger LOG = LoggerFactory.getLogger(TestExtServicesWithLocalMode.class);
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static String TEST_ROOT_DIR = "target/" + TestExtServicesWithLocalMode.class.getName() + "-tmpDir";
    private static final String STAGING_DIR = new File(System.getProperty("test.build.data"), TestExtServicesWithLocalMode.class.getName()).getAbsolutePath();
    private static final Path SRC_DATA_DIR = new Path(TEST_ROOT_DIR + "/data");
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
    private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create((String)"ExtServiceTestPush", (String)"ExtServiceTestPush", (String)"ExtServiceTestPush");
    private static volatile Configuration clusterConf = new Configuration();
    private static volatile FileSystem localFs;
    private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
    private static volatile Configuration confForJobs;

    @BeforeClass
    public static void setup() throws Exception {
        localFs = FileSystem.getLocal((Configuration)clusterConf).getRaw();
        long jvmMax = Runtime.getRuntime().maxMemory();
        tezTestServiceCluster = MiniTezTestServiceCluster.create(TestExternalTezServices.class.getSimpleName(), 3, (long)((double)jvmMax * 0.5), 1);
        tezTestServiceCluster.init(clusterConf);
        tezTestServiceCluster.start();
        LOG.info("MiniTezTestServer started");
        confForJobs = new Configuration(clusterConf);
        for (Map.Entry entry : tezTestServiceCluster.getClusterSpecificConfiguration()) {
            confForJobs.set((String)entry.getKey(), (String)entry.getValue());
        }
        confForJobs.setBoolean("tez.local.mode", true);
        confForJobs.set("tez.staging-dir", STAGING_DIR.toString());
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        if (tezTestServiceCluster != null) {
            tezTestServiceCluster.stop();
            tezTestServiceCluster = null;
        }
        Path testRootDirPath = new Path(TEST_ROOT_DIR);
        testRootDirPath = localFs.makeQualified(testRootDirPath);
        LOG.info("CLeaning up path: " + testRootDirPath);
        localFs.delete(testRootDirPath, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void test1() throws Exception {
        UserPayload userPayload = TezUtils.createUserPayloadFromConf((Configuration)confForJobs);
        TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor)TaskSchedulerDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(userPayload)};
        ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor)ContainerLauncherDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(userPayload)};
        TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor)TaskCommunicatorDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(userPayload)};
        ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create((boolean)true, (boolean)false, (TaskSchedulerDescriptor[])taskSchedulerDescriptors, (ContainerLauncherDescriptor[])containerLauncherDescriptors, (TaskCommunicatorDescriptor[])taskCommunicatorDescriptors);
        TezConfiguration tezConf = new TezConfiguration(confForJobs);
        TezClient tezClient = TezClient.newBuilder((String)"test1", (TezConfiguration)tezConf).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        try {
            tezClient.start();
            Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
            Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
            Path expectedResultPath = new Path(SRC_DATA_DIR, "expectedOutputPath");
            JoinDataGen dataGen = new JoinDataGen();
            String[] dataGenArgs = new String[]{dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedResultPath.toString(), "2"};
            Assert.assertEquals((long)0L, (long)dataGen.run(tezConf, dataGenArgs, tezClient));
            Path outputPath = new Path(SRC_DATA_DIR, "outPath");
            HashJoinExample joinExample = new HashJoinExample();
            String[] args = new String[]{dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()};
            Assert.assertEquals((long)0L, (long)joinExample.run(tezConf, args, tezClient));
            LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
            Assert.assertEquals((long)0L, (long)tezTestServiceCluster.getNumSubmissions());
            this.runJoinValidate(tezClient, "allInExt", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
            LOG.info("Completed allInExt");
            this.runJoinValidate(tezClient, "noneInExt", 0, null, null, null);
            LOG.info("Completed noneInExt");
            this.runJoinValidate(tezClient, "lhsInExt", 2, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, null, null);
            LOG.info("Completed lhsInExt");
        }
        finally {
            tezClient.stop();
        }
    }

    private void runJoinValidate(TezClient tezClient, String name, int extExpectedCount, Vertex.VertexExecutionContext lhsContext, Vertex.VertexExecutionContext rhsContext, Vertex.VertexExecutionContext validateContext) throws Exception {
        int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
        TezConfiguration tezConf = new TezConfiguration(confForJobs);
        JoinValidateConfigured joinValidate = new JoinValidateConfigured(null, lhsContext, rhsContext, validateContext, name);
        String[] validateArgs = new String[]{"-disableSplitGrouping", HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
        Assert.assertEquals((long)0L, (long)joinValidate.run(tezConf, validateArgs, tezClient));
        Assert.assertEquals((long)extExpectedCount, (long)(tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount));
    }
}

