package org.apache.tez.examples;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;

/* loaded from: input_file:org/apache/tez/examples/JoinValidate.class */
public class JoinValidate extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(JoinValidate.class);
    private static final String LHS_INPUT_NAME = "lhsfile";
    private static final String RHS_INPUT_NAME = "rhsfile";
    private static final String COUNTER_GROUP_NAME = "JOIN_VALIDATE";
    private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";

    /* loaded from: input_file:org/apache/tez/examples/JoinValidate$JoinValidateProcessor.class */
    public static class JoinValidateProcessor extends SimpleProcessor {
        private static final Log LOG = LogFactory.getLog(JoinValidateProcessor.class);

        public JoinValidateProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void run() throws Exception {
            Preconditions.checkState(getInputs().size() == 2);
            Preconditions.checkState(getOutputs().size() == 0);
            LogicalInput logicalInput = (LogicalInput) getInputs().get(JoinValidate.LHS_INPUT_NAME);
            LogicalInput logicalInput2 = (LogicalInput) getInputs().get(JoinValidate.RHS_INPUT_NAME);
            KeyValuesReader reader = logicalInput.getReader();
            KeyValuesReader reader2 = logicalInput2.getReader();
            Preconditions.checkState(reader instanceof KeyValuesReader);
            Preconditions.checkState(reader2 instanceof KeyValuesReader);
            KeyValuesReader keyValuesReader = reader;
            KeyValuesReader keyValuesReader2 = reader2;
            TezCounter findCounter = getContext().getCounters().findCounter(JoinValidate.COUNTER_GROUP_NAME, JoinValidate.MISSING_KEY_COUNTER_NAME);
            while (true) {
                if (!keyValuesReader.next()) {
                    break;
                }
                if (!keyValuesReader2.next()) {
                    findCounter.increment(1L);
                    LOG.info("ExtraKey in lhs: " + keyValuesReader.getClass());
                    break;
                } else if (!keyValuesReader.getCurrentKey().equals(keyValuesReader2.getCurrentKey())) {
                    LOG.info("MismatchedKeys: lhs=" + keyValuesReader.getCurrentKey() + ", rhs=" + keyValuesReader2.getCurrentKey());
                    findCounter.increment(1L);
                }
            }
            if (keyValuesReader2.next()) {
                findCounter.increment(1L);
                LOG.info("ExtraKey in rhs: " + keyValuesReader.getClass());
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new JoinValidate(), strArr));
    }

    private static void printUsage() {
        System.err.println("Usage: joinvalidate <path1> <path2>");
        ToolRunner.printGenericCommandUsage(System.err);
    }

    public int run(String[] strArr) throws Exception {
        String[] remainingArgs = new GenericOptionsParser(getConf(), strArr).getRemainingArgs();
        int validateArgs = validateArgs(remainingArgs);
        return validateArgs != 0 ? validateArgs : execute(remainingArgs);
    }

    public int run(Configuration configuration, String[] strArr, TezClient tezClient) throws Exception {
        setConf(configuration);
        String[] remainingArgs = new GenericOptionsParser(configuration, strArr).getRemainingArgs();
        int validateArgs = validateArgs(remainingArgs);
        return validateArgs != 0 ? validateArgs : execute(remainingArgs, tezClient);
    }

    private int validateArgs(String[] strArr) {
        if (strArr.length == 3 || strArr.length == 2) {
            return 0;
        }
        printUsage();
        return 2;
    }

    private int execute(String[] strArr) throws TezException, IOException, InterruptedException {
        TezConfiguration tezConfiguration = new TezConfiguration(getConf());
        TezClient tezClient = null;
        try {
            tezClient = createTezClient(tezConfiguration);
            int execute = execute(strArr, tezConfiguration, tezClient);
            if (tezClient != null) {
                tezClient.stop();
            }
            return execute;
        } catch (Throwable th) {
            if (tezClient != null) {
                tezClient.stop();
            }
            throw th;
        }
    }

    private int execute(String[] strArr, TezClient tezClient) throws IOException, TezException, InterruptedException {
        return execute(strArr, new TezConfiguration(getConf()), tezClient);
    }

    private TezClient createTezClient(TezConfiguration tezConfiguration) throws TezException, IOException {
        TezClient create = TezClient.create("JoinValidate", tezConfiguration);
        create.start();
        return create;
    }

    private int execute(String[] strArr, TezConfiguration tezConfiguration, TezClient tezClient) throws IOException, TezException, InterruptedException {
        LOG.info("Running JoinValidate");
        UserGroupInformation.setConfiguration(tezConfiguration);
        String str = strArr[0];
        String str2 = strArr[1];
        int i = 1;
        if (strArr.length == 3) {
            i = Integer.parseInt(strArr[2]);
        }
        if (i <= 0) {
            System.err.println("NumPartitions must be > 0");
            return 4;
        }
        DAG createDag = createDag(tezConfiguration, new Path(str), new Path(str2), i);
        tezClient.waitTillReady();
        DAGClient submitDAG = tezClient.submitDAG(createDag);
        DAGStatus waitForCompletionWithStatusUpdates = submitDAG.waitForCompletionWithStatusUpdates((Set) null);
        if (waitForCompletionWithStatusUpdates.getState() != DAGStatus.State.SUCCEEDED) {
            LOG.info("DAG diagnostics: " + waitForCompletionWithStatusUpdates.getDiagnostics());
            return -1;
        }
        TezCounter findCounter = submitDAG.getDAGStatus(Sets.newHashSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS})).getDAGCounters().findCounter(COUNTER_GROUP_NAME, MISSING_KEY_COUNTER_NAME);
        if (findCounter == null) {
            LOG.info("Unable to determing equality");
            return -2;
        }
        if (findCounter.getValue() != 0) {
            LOG.info("Validate failed. The two sides are not equivalent");
            return -3;
        }
        LOG.info("Validation successful. The two sides are equivalent");
        return 0;
    }

    private DAG createDag(TezConfiguration tezConfiguration, Path path, Path path2, int i) throws IOException {
        DAG create = DAG.create("JoinValidate");
        OrderedPartitionedKVEdgeConfig build = OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), NullWritable.class.getName(), HashPartitioner.class.getName()).build();
        Vertex addDataSource = Vertex.create(LHS_INPUT_NAME, ProcessorDescriptor.create(HashJoinExample.ForwardingProcessor.class.getName())).addDataSource("lhs", MRInput.createConfigBuilder(new Configuration(tezConfiguration), TextInputFormat.class, path.toUri().toString()).groupSplits(false).build());
        Vertex addDataSource2 = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(HashJoinExample.ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput.createConfigBuilder(new Configuration(tezConfiguration), TextInputFormat.class, path2.toUri().toString()).groupSplits(false).build());
        Vertex create2 = Vertex.create("joinvalidate", ProcessorDescriptor.create(JoinValidateProcessor.class.getName()), i);
        Edge create3 = Edge.create(addDataSource, create2, build.createDefaultEdgeProperty());
        create.addVertex(addDataSource).addVertex(addDataSource2).addVertex(create2).addEdge(create3).addEdge(Edge.create(addDataSource2, create2, build.createDefaultEdgeProperty()));
        return create;
    }
}
