package org.apache.tez.examples;

import com.google.common.base.Preconditions;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/examples/SortMergeJoinExample.class */
public class SortMergeJoinExample extends TezExampleBase {
    private static final Logger LOG = LoggerFactory.getLogger(SortMergeJoinExample.class);
    private static final String input1 = "input1";
    private static final String input2 = "input2";
    private static final String inputFile = "inputFile";
    private static final String joiner = "joiner";
    private static final String joinOutput = "joinOutput";

    /* loaded from: input_file:org/apache/tez/examples/SortMergeJoinExample$SortMergeJoinProcessor.class */
    public static class SortMergeJoinProcessor extends SimpleMRProcessor {
        public SortMergeJoinProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void run() throws Exception {
            Preconditions.checkState(getInputs().size() == 2);
            Preconditions.checkState(getOutputs().size() == 1);
            LogicalInput logicalInput = (LogicalInput) getInputs().get(SortMergeJoinExample.input1);
            LogicalInput logicalInput2 = (LogicalInput) getInputs().get(SortMergeJoinExample.input2);
            Reader reader = logicalInput.getReader();
            Reader reader2 = logicalInput2.getReader();
            Preconditions.checkState(reader instanceof KeyValuesReader);
            Preconditions.checkState(reader2 instanceof KeyValuesReader);
            LogicalOutput logicalOutput = (LogicalOutput) getOutputs().get(SortMergeJoinExample.joinOutput);
            Preconditions.checkState(logicalOutput.getWriter() instanceof KeyValueWriter);
            join((KeyValuesReader) reader, (KeyValuesReader) reader2, (KeyValueWriter) logicalOutput.getWriter());
        }

        private void join(KeyValuesReader keyValuesReader, KeyValuesReader keyValuesReader2, KeyValueWriter keyValueWriter) throws IOException {
            while (keyValuesReader.next() && keyValuesReader2.next()) {
                Text text = (Text) keyValuesReader.getCurrentKey();
                Text text2 = (Text) keyValuesReader2.getCurrentKey();
                boolean z = false;
                while (true) {
                    if (text.compareTo(text2) != 0) {
                        if (text.compareTo(text2) <= 0) {
                            if (!keyValuesReader.next()) {
                                z = true;
                                break;
                            }
                            text = (Text) keyValuesReader.getCurrentKey();
                        } else {
                            if (!keyValuesReader2.next()) {
                                z = true;
                                break;
                            }
                            text2 = (Text) keyValuesReader2.getCurrentKey();
                        }
                    } else {
                        break;
                    }
                }
                if (z) {
                    return;
                } else {
                    keyValueWriter.write(text, NullWritable.get());
                }
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new SortMergeJoinExample(), strArr));
    }

    @Override // org.apache.tez.examples.TezExampleBase
    protected void printUsage() {
        System.err.println("Usage: sortmergejoin <file1> <file2> <numPartitions> <outPath>");
    }

    @Override // org.apache.tez.examples.TezExampleBase
    protected int runJob(String[] strArr, TezConfiguration tezConfiguration, TezClient tezClient) throws Exception {
        String str = strArr[0];
        String str2 = strArr[1];
        int parseInt = Integer.parseInt(strArr[2]);
        String str3 = strArr[3];
        Path path = new Path(str);
        Path path2 = new Path(str2);
        Path path3 = new Path(str3);
        if (FileSystem.get(tezConfiguration).exists(path3)) {
            System.err.println("Output directory: " + str3 + " already exists");
            return 3;
        }
        if (parseInt <= 0) {
            System.err.println("NumPartitions must be > 0");
            return 4;
        }
        DAG createDag = createDag(tezConfiguration, path, path2, path3, parseInt);
        LOG.info("Running SortMergeJoinExample");
        return runDag(createDag, isCountersLog(), LOG);
    }

    @Override // org.apache.tez.examples.TezExampleBase
    protected int validateArgs(String[] strArr) {
        return strArr.length != 4 ? 2 : 0;
    }

    private DAG createDag(TezConfiguration tezConfiguration, Path path, Path path2, Path path3, int i) throws IOException {
        DAG create = DAG.create("SortMergeJoinExample");
        Vertex addDataSource = Vertex.create(input1, ProcessorDescriptor.create(HashJoinExample.ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder(new Configuration(tezConfiguration), TextInputFormat.class, path.toUri().toString()).groupSplits(!isDisableSplitGrouping()).generateSplitsInAM(!isGenerateSplitInClient()).build());
        Vertex addDataSource2 = Vertex.create(input2, ProcessorDescriptor.create(HashJoinExample.ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder(new Configuration(tezConfiguration), TextInputFormat.class, path2.toUri().toString()).groupSplits(!isDisableSplitGrouping()).generateSplitsInAM(!isGenerateSplitInClient()).build());
        Vertex addDataSink = Vertex.create(joiner, ProcessorDescriptor.create(SortMergeJoinProcessor.class.getName()), i).setVertexManagerPlugin(ShuffleVertexManager.createConfigBuilder(tezConfiguration).setAutoReduceParallelism(true).build()).addDataSink(joinOutput, MROutput.createConfigBuilder(new Configuration(tezConfiguration), TextOutputFormat.class, path3.toUri().toString()).build());
        OrderedPartitionedKVEdgeConfig build = OrderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), NullWritable.class.getName(), HashPartitioner.class.getName()).setFromConfiguration(tezConfiguration).build();
        create.addVertex(addDataSource).addVertex(addDataSource2).addVertex(addDataSink).addEdge(Edge.create(addDataSource, addDataSink, build.createDefaultEdgeProperty())).addEdge(Edge.create(addDataSource2, addDataSink, build.createDefaultEdgeProperty()));
        return create;
    }
}
