package org.apache.tez.test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/tez/test/TestLocalMode.class */
public class TestLocalMode {
    private static final File STAGING_DIR = new File(System.getProperty("test.build.data"), TestLocalMode.class.getName());
    private static MiniDFSCluster dfsCluster;
    private static FileSystem remoteFs;
    private final boolean useDfs;
    private final boolean useLocalModeWithoutNetwork;

    /* loaded from: input_file:org/apache/tez/test/TestLocalMode$FailingProcessor.class */
    public static class FailingProcessor extends AbstractLogicalIOProcessor {
        public FailingProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void initialize() throws Exception {
        }

        public void handleEvents(List<Event> list) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
            throw new TezException("FailingProcessor");
        }
    }

    @Parameterized.Parameters(name = "useDFS:{0} useLocalModeWithoutNetwork:{1}")
    public static Collection<Object[]> params() {
        return Arrays.asList(new Object[]{false, false}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{true, true});
    }

    public TestLocalMode(boolean z, boolean z2) {
        this.useDfs = z;
        this.useLocalModeWithoutNetwork = z2;
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        try {
            dfsCluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(3).format(true).racks((String[]) null).build();
            remoteFs = dfsCluster.getFileSystem();
        } catch (IOException e) {
            throw new RuntimeException("problem starting mini dfs cluster", e);
        }
    }

    @AfterClass
    public static void afterClass() throws InterruptedException {
        if (dfsCluster != null) {
            try {
                dfsCluster.shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private TezConfiguration createConf() {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.setBoolean("tez.local.mode", true);
        tezConfiguration.setBoolean("tez.local.mode.without.network", this.useLocalModeWithoutNetwork);
        if (this.useDfs) {
            tezConfiguration.set("fs.defaultFS", remoteFs.getUri().toString());
        } else {
            tezConfiguration.set("fs.defaultFS", "file:///");
        }
        tezConfiguration.set("tez.staging-dir", STAGING_DIR.getAbsolutePath());
        tezConfiguration.setBoolean("tez.runtime.optimize.local.fetch", true);
        return tezConfiguration;
    }

    @Test(timeout = 30000)
    public void testMultipleClientsWithSession() throws TezException, InterruptedException, IOException {
        TezClient create = TezClient.create("commonName", createConf(), true);
        create.start();
        DAGClient submitDAG = create.submitDAG(createSimpleDAG("dag1", SleepProcessor.class.getName()));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        submitDAG.close();
        create.stop();
        TezConfiguration createConf = createConf();
        DAG createSimpleDAG = createSimpleDAG("dag2", SleepProcessor.class.getName());
        TezClient create2 = TezClient.create("commonName", createConf, true);
        create2.start();
        DAGClient submitDAG2 = create2.submitDAG(createSimpleDAG);
        submitDAG2.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG2.getDAGStatus((Set) null).getState());
        Assert.assertFalse(submitDAG.getExecutionContext().equals(submitDAG2.getExecutionContext()));
        submitDAG2.close();
        create2.stop();
    }

    @Test(timeout = 10000)
    public void testMultipleClientsWithoutSession() throws TezException, InterruptedException, IOException {
        TezClient create = TezClient.create("commonName", createConf(), false);
        create.start();
        DAGClient submitDAG = create.submitDAG(createSimpleDAG("dag1", SleepProcessor.class.getName()));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        submitDAG.close();
        create.stop();
        TezConfiguration createConf = createConf();
        DAG createSimpleDAG = createSimpleDAG("dag2", SleepProcessor.class.getName());
        TezClient create2 = TezClient.create("commonName", createConf, false);
        create2.start();
        DAGClient submitDAG2 = create2.submitDAG(createSimpleDAG);
        submitDAG2.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG2.getDAGStatus((Set) null).getState());
        Assert.assertFalse(submitDAG.getExecutionContext().equals(submitDAG2.getExecutionContext()));
        submitDAG2.close();
        create2.stop();
    }

    @Test(timeout = 20000)
    public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException, IOException {
        TezClient create = TezClient.create("commonName", createConf(), false);
        create.start();
        DAGClient submitDAG = create.submitDAG(createSimpleDAG("dag1", SleepProcessor.class.getName()));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, submitDAG.getDAGStatus((Set) null).getState());
        Thread.sleep(7500L);
        submitDAG.close();
        create.stop();
    }

    @Test(timeout = 20000)
    public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, IOException {
        TezClient create = TezClient.create("commonName", createConf(), false);
        create.start();
        DAGClient submitDAG = create.submitDAG(createSimpleDAG("dag1", FailingProcessor.class.getName()));
        submitDAG.waitForCompletion();
        Assert.assertEquals(DAGStatus.State.FAILED, submitDAG.getDAGStatus((Set) null).getState());
        Thread.sleep(7500L);
        submitDAG.close();
        create.stop();
    }

    private DAG createSimpleDAG(String str, String str2) {
        return DAG.create(str).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create(str2).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
    }

    @Test(timeout = 30000)
    public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {
        String[] strArr = new String[2];
        String[] strArr2 = new String[2];
        DAGClient[] dAGClientArr = new DAGClient[2];
        TezConfiguration createConf = createConf();
        TezClient create = TezClient.create("testMultiDAGOnSession", createConf, true);
        create.start();
        FileSystem fileSystem = FileSystem.get(createConf);
        for (int i = 0; i < 2; i++) {
            strArr[i] = new Path(STAGING_DIR.getAbsolutePath(), "in-" + i).toString();
            createInputFile(fileSystem, strArr[i]);
            strArr2[i] = new Path(STAGING_DIR.getAbsolutePath(), "out-" + i).toString();
        }
        for (int i2 = 0; i2 < strArr.length; i2++) {
            try {
                DAG createDAG = OrderedWordCount.createDAG(createConf, strArr[i2], strArr2[i2], 1, false, false, "DAG-Iteration-" + i2);
                create.waitTillReady();
                System.out.println("Running dag number " + i2);
                dAGClientArr[i2] = create.submitDAG(createDAG);
                DAGStatus waitForCompletion = dAGClientArr[i2].waitForCompletion();
                if (waitForCompletion.getState() != DAGStatus.State.SUCCEEDED) {
                    Assert.fail("Iteration " + i2 + " failed with diagnostics: " + waitForCompletion.getDiagnostics());
                }
                if (i2 > 0) {
                    Assert.assertTrue(dAGClientArr[i2 - 1].getExecutionContext().equals(dAGClientArr[i2].getExecutionContext()));
                }
            } finally {
                create.stop();
            }
        }
    }

    private void createInputFile(FileSystem fileSystem, String str) throws IOException {
        try {
            FSDataOutputStream create = fileSystem.create(new Path(new Path(str), "input.txt"));
            create.write("This is a small test file !".getBytes());
            create.flush();
            create.close();
        } catch (IOException e) {
            Assert.fail("Can not create input File!");
        }
    }
}
