package org.apache.tez.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/tez/test/TestPipelinedShuffle.class */
public class TestPipelinedShuffle {
    private static MiniDFSCluster miniDFSCluster;
    private static MiniTezCluster miniTezCluster;
    private static FileSystem fs;
    private static final int KEYS_PER_MAPPER = 5000;
    private static Configuration conf = new Configuration(false);
    private static String TEST_ROOT_DIR = "target/" + TestPipelinedShuffle.class.getName() + "-tmpDir";

    /* loaded from: input_file:org/apache/tez/test/TestPipelinedShuffle$PipelinedShuffleJob.class */
    public static class PipelinedShuffleJob extends Configured implements Tool {
        private TezConfiguration tezConf;

        /* loaded from: input_file:org/apache/tez/test/TestPipelinedShuffle$PipelinedShuffleJob$DataGenerator.class */
        public static class DataGenerator extends SimpleMRProcessor {
            public DataGenerator(ProcessorContext processorContext) {
                super(processorContext);
            }

            public void run() throws Exception {
                Preconditions.checkArgument(getInputs().size() == 0);
                Preconditions.checkArgument(getOutputs().size() == 1);
                KeyValueWriter writer = ((LogicalOutput) getOutputs().get("reducer")).getWriter();
                for (int i = 0; i < TestPipelinedShuffle.KEYS_PER_MAPPER; i++) {
                    writer.write(new Text(RandomStringUtils.randomAlphanumeric(1000)), new Text(RandomStringUtils.randomAlphanumeric(1000)));
                }
            }
        }

        /* loaded from: input_file:org/apache/tez/test/TestPipelinedShuffle$PipelinedShuffleJob$SimpleReduceProcessor.class */
        public static class SimpleReduceProcessor extends SimpleMRProcessor {
            public SimpleReduceProcessor(ProcessorContext processorContext) {
                super(processorContext);
            }

            private long readData(KeyValuesReader keyValuesReader) throws IOException {
                long j = 0;
                while (keyValuesReader.next()) {
                    keyValuesReader.getCurrentKey();
                    for (Object obj : keyValuesReader.getCurrentValues()) {
                        j++;
                    }
                }
                return j;
            }

            public void run() throws Exception {
                Preconditions.checkArgument(getInputs().size() == 2);
                Assert.assertEquals(10000L, 0 + readData((KeyValuesReader) ((LogicalInput) getInputs().get("mapper1")).getReader()) + readData((KeyValuesReader) ((LogicalInput) getInputs().get("mapper2")).getReader()));
            }
        }

        public int run(String[] strArr) throws Exception {
            this.tezConf = new TezConfiguration(getConf());
            DAG create = DAG.create("pipelinedShuffleTest");
            Vertex create2 = Vertex.create("mapper1", ProcessorDescriptor.create(DataGenerator.class.getName()), 1);
            Vertex create3 = Vertex.create("mapper2", ProcessorDescriptor.create(DataGenerator.class.getName()), 1);
            Vertex create4 = Vertex.create("reducer", ProcessorDescriptor.create(SimpleReduceProcessor.class.getName()), 1);
            Edge create5 = Edge.create(create2, create4, OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), Text.class.getName(), HashPartitioner.class.getName()).setFromConfiguration(this.tezConf).build().createDefaultEdgeProperty());
            Edge create6 = Edge.create(create3, create4, OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), Text.class.getName(), HashPartitioner.class.getName()).setFromConfiguration(this.tezConf).build().createDefaultEdgeProperty());
            create.addVertex(create2);
            create.addVertex(create3);
            create.addVertex(create4);
            create.addEdge(create5).addEdge(create6);
            TezClient create7 = TezClient.create("pipelinedShuffleTest", this.tezConf);
            create7.start();
            create7.waitTillReady();
            DAGClient submitDAG = create7.submitDAG(create);
            HashSet newHashSet = Sets.newHashSet();
            newHashSet.add(StatusGetOpts.GET_COUNTERS);
            DAGStatus waitForCompletionWithStatusUpdates = submitDAG.waitForCompletionWithStatusUpdates(newHashSet);
            System.out.println(waitForCompletionWithStatusUpdates.getDAGCounters());
            Assert.assertTrue(waitForCompletionWithStatusUpdates.getDAGCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT).getValue() > 10);
            if (waitForCompletionWithStatusUpdates.getState() == DAGStatus.State.SUCCEEDED) {
                return 0;
            }
            System.out.println("DAG diagnostics: " + waitForCompletionWithStatusUpdates.getDiagnostics());
            return -1;
        }
    }

    @BeforeClass
    public static void setupDFSCluster() throws Exception {
        conf = new Configuration(false);
        conf.setBoolean("dfs.namenode.edits.noeditlogchannelflush", false);
        conf.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
        conf.set("fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
        conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
        miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        fs = miniDFSCluster.getFileSystem();
        conf.set("fs.defaultFS", fs.getUri().toString());
        conf.setBoolean("tez.runtime.optimize.local.fetch", false);
    }

    @AfterClass
    public static void shutdownDFSCluster() {
        if (miniDFSCluster != null) {
            miniDFSCluster.shutdown();
        }
    }

    @Before
    public void setupTezCluster() throws Exception {
        conf.setInt("tez.runtime.io.sort.mb", 1);
        conf.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        conf.setBoolean("tez.runtime.optimize.local.fetch", true);
        conf.setInt("tez.runtime.shuffle.connect.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.read.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.fetch.failures.limit", 2);
        conf.setLong("tez.am.sleep.time.before.exit.millis", 500L);
        miniTezCluster = new MiniTezCluster(TestPipelinedShuffle.class.getName(), 1, 1, 1);
        miniTezCluster.init(conf);
        miniTezCluster.start();
    }

    @After
    public void shutdownTezCluster() throws IOException {
        if (miniTezCluster != null) {
            miniTezCluster.stop();
        }
    }

    @Test
    public void baseTest() throws Exception {
        Configuration configuration = new Configuration(miniTezCluster.getConfig());
        configuration.setBoolean("tez.runtime.shuffle.use.async.http", false);
        test(configuration);
        Configuration configuration2 = new Configuration(miniTezCluster.getConfig());
        configuration2.setBoolean("tez.runtime.shuffle.use.async.http", true);
        test(configuration2);
    }

    private void test(Configuration configuration) throws Exception {
        new PipelinedShuffleJob().setConf(configuration);
        Assert.assertEquals(0L, r0.run(new String[0]));
    }
}
