package org.apache.tez.tests;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/tests/TestExtServicesWithLocalMode.class */
public class TestExtServicesWithLocalMode {
    private static volatile FileSystem localFs;
    private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
    private static volatile Configuration confForJobs;
    private static final Logger LOG = LoggerFactory.getLogger(TestExtServicesWithLocalMode.class);
    private static String TEST_ROOT_DIR = "target/" + TestExtServicesWithLocalMode.class.getName() + "-tmpDir";
    private static final Path SRC_DATA_DIR = new Path(TEST_ROOT_DIR + "/data");
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
    private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static volatile Configuration clusterConf = new Configuration(false);

    @BeforeClass
    public static void setup() throws Exception {
        localFs = FileSystem.getLocal(clusterConf).getRaw();
        tezTestServiceCluster = MiniTezTestServiceCluster.create(TestExternalTezServices.class.getSimpleName(), 3, (long) (Runtime.getRuntime().maxMemory() * 0.5d), 1);
        tezTestServiceCluster.init(clusterConf);
        tezTestServiceCluster.start();
        LOG.info("MiniTezTestServer started");
        confForJobs = new Configuration(clusterConf);
        Iterator it = tezTestServiceCluster.getClusterSpecificConfiguration().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            confForJobs.set((String) entry.getKey(), (String) entry.getValue());
        }
        confForJobs.setBoolean("tez.local.mode", true);
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        if (tezTestServiceCluster != null) {
            tezTestServiceCluster.stop();
            tezTestServiceCluster = null;
        }
        Path makeQualified = localFs.makeQualified(new Path(TEST_ROOT_DIR));
        LOG.info("CLeaning up path: " + makeQualified);
        localFs.delete(makeQualified, true);
    }

    @Test(timeout = 300000)
    public void test1() throws Exception {
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(confForJobs);
        TezClient build = TezClient.newBuilder("test1", new TezConfiguration(confForJobs)).setIsSession(true).setServicePluginDescriptor(ServicePluginsDescriptor.create(true, false, new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(createUserPayloadFromConf)}, new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(createUserPayloadFromConf)}, new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(createUserPayloadFromConf)})).build();
        try {
            build.start();
            Path path = new Path(SRC_DATA_DIR, "inPath1");
            Path path2 = new Path(SRC_DATA_DIR, "inPath2");
            Assert.assertEquals(0L, new JoinDataGen().run(r0, new String[]{path.toString(), "1048576", path2.toString(), "524288", new Path(SRC_DATA_DIR, "expectedOutputPath").toString(), "2"}, build));
            Assert.assertEquals(0L, new HashJoinExample().run(r0, new String[]{path.toString(), path2.toString(), "2", new Path(SRC_DATA_DIR, "outPath").toString()}, build));
            LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
            Assert.assertEquals(0L, tezTestServiceCluster.getNumSubmissions());
            runJoinValidate(build, "allInExt", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
            LOG.info("Completed allInExt");
            runJoinValidate(build, "noneInExt", 0, null, null, null);
            LOG.info("Completed noneInExt");
            runJoinValidate(build, "lhsInExt", 2, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, null, null);
            LOG.info("Completed lhsInExt");
            build.stop();
        } catch (Throwable th) {
            build.stop();
            throw th;
        }
    }

    private void runJoinValidate(TezClient tezClient, String str, int i, Vertex.VertexExecutionContext vertexExecutionContext, Vertex.VertexExecutionContext vertexExecutionContext2, Vertex.VertexExecutionContext vertexExecutionContext3) throws Exception {
        int numSubmissions = tezTestServiceCluster.getNumSubmissions();
        Assert.assertEquals(0L, new JoinValidateConfigured(null, vertexExecutionContext, vertexExecutionContext2, vertexExecutionContext3, str).run(new TezConfiguration(confForJobs), new String[]{"-disableSplitGrouping", HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}, tezClient));
        Assert.assertEquals(i, tezTestServiceCluster.getNumSubmissions() - numSubmissions);
    }
}
