/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.examples;

import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;

public class JoinDataGen
extends Configured
implements Tool {
    private static final Log LOG = LogFactory.getLog(JoinDataGen.class);
    private static final String STREAM_OUTPUT_NAME = "streamoutput";
    private static final String HASH_OUTPUT_NAME = "hashoutput";
    private static final String EXPECTED_OUTPUT_NAME = "expectedoutput";

    public static void main(String[] args) throws Exception {
        JoinDataGen dataGen = new JoinDataGen();
        int status = ToolRunner.run((Configuration)new Configuration(), (Tool)dataGen, (String[])args);
        System.exit(status);
    }

    private static void printUsage() {
        System.err.println("Usage: joindatagen <outPath1> <path1Size> <outPath2> <path2Size> <expectedResultPath> <numTasks>");
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int result = this.validateArgs(otherArgs);
        if (result != 0) {
            return result;
        }
        return this.execute(otherArgs);
    }

    public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
        this.setConf(conf);
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        int result = this.validateArgs(otherArgs);
        if (result != 0) {
            return result;
        }
        return this.execute(otherArgs, tezClient);
    }

    private int validateArgs(String[] otherArgs) {
        if (otherArgs.length != 6) {
            JoinDataGen.printUsage();
            return 2;
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int execute(String[] args) throws TezException, IOException, InterruptedException {
        TezConfiguration tezConf = new TezConfiguration(this.getConf());
        TezClient tezClient = null;
        try {
            tezClient = this.createTezClient(tezConf);
            int n = this.execute(args, tezConf, tezClient);
            return n;
        }
        finally {
            if (tezClient != null) {
                tezClient.stop();
            }
        }
    }

    private int execute(String[] args, TezClient tezClient) throws IOException, TezException, InterruptedException {
        TezConfiguration tezConf = new TezConfiguration(this.getConf());
        return this.execute(args, tezConf, tezClient);
    }

    private TezClient createTezClient(TezConfiguration tezConf) throws TezException, IOException {
        TezClient tezClient = TezClient.create((String)"JoinDataGen", (TezConfiguration)tezConf);
        tezClient.start();
        return tezClient;
    }

    private int execute(String[] args, TezConfiguration tezConf, TezClient tezClient) throws IOException, TezException, InterruptedException {
        LOG.info((Object)"Running JoinDataGen");
        UserGroupInformation.setConfiguration((Configuration)tezConf);
        String outDir1 = args[0];
        long outDir1Size = Long.parseLong(args[1]);
        String outDir2 = args[2];
        long outDir2Size = Long.parseLong(args[3]);
        String expectedOutputDir = args[4];
        int numTasks = Integer.parseInt(args[5]);
        Path largeOutPath = null;
        Path smallOutPath = null;
        long largeOutSize = 0L;
        long smallOutSize = 0L;
        if (outDir1Size >= outDir2Size) {
            largeOutPath = new Path(outDir1);
            largeOutSize = outDir1Size;
            smallOutPath = new Path(outDir2);
            smallOutSize = outDir2Size;
        } else {
            largeOutPath = new Path(outDir2);
            largeOutSize = outDir2Size;
            smallOutPath = new Path(outDir1);
            smallOutSize = outDir1Size;
        }
        Path expectedOutputPath = new Path(expectedOutputDir);
        FileSystem fs = FileSystem.get((Configuration)tezConf);
        int res = 0;
        res = this.checkOutputDirectory(fs, largeOutPath) + this.checkOutputDirectory(fs, smallOutPath) + this.checkOutputDirectory(fs, expectedOutputPath);
        if (res != 0) {
            return 3;
        }
        if (numTasks <= 0) {
            System.err.println("NumTasks must be > 0");
            return 4;
        }
        DAG dag = this.createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks, largeOutSize, smallOutSize);
        tezClient.waitTillReady();
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
            LOG.info((Object)("DAG diagnostics: " + dagStatus.getDiagnostics()));
            return -1;
        }
        return 0;
    }

    private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath, Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize) throws IOException {
        long largeOutSizePerTask = largeOutSize / (long)numTasks;
        long smallOutSizePerTask = smallOutSize / (long)numTasks;
        DAG dag = DAG.create((String)"JoinDataGen");
        Vertex genDataVertex = Vertex.create((String)"datagen", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)GenDataProcessor.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(GenDataProcessor.createConfiguration(largeOutSizePerTask, smallOutSizePerTask))))), (int)numTasks);
        genDataVertex.addDataSink(STREAM_OUTPUT_NAME, MROutput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextOutputFormat.class, (String)largeOutPath.toUri().toString()).build());
        genDataVertex.addDataSink(HASH_OUTPUT_NAME, MROutput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextOutputFormat.class, (String)smallOutPath.toUri().toString()).build());
        genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME, MROutput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextOutputFormat.class, (String)expectedOutputPath.toUri().toString()).build());
        dag.addVertex(genDataVertex);
        return dag;
    }

    private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
        if (fs.exists(path)) {
            System.err.println("Output directory: " + path + " already exists");
            return 2;
        }
        return 0;
    }

    public static class GenDataProcessor
    extends SimpleMRProcessor {
        private static final Log LOG = LogFactory.getLog(GenDataProcessor.class);
        long streamOutputFileSize;
        long hashOutputFileSize;
        float overlapApprox = 0.2f;

        public GenDataProcessor(ProcessorContext context) {
            super(context);
        }

        public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize) throws IOException {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            dos.writeLong(streamOutputFileSize);
            dos.writeLong(hashOutputFileSize);
            dos.close();
            bos.close();
            return bos.toByteArray();
        }

        public void initialize() throws Exception {
            byte[] payload = this.getContext().getUserPayload().deepCopyAsArray();
            ByteArrayInputStream bis = new ByteArrayInputStream(payload);
            DataInputStream dis = new DataInputStream(bis);
            this.streamOutputFileSize = dis.readLong();
            this.hashOutputFileSize = dis.readLong();
            LOG.info((Object)("Initialized with largeFileTargetSize=" + this.streamOutputFileSize + ", smallFileTragetSize=" + this.hashOutputFileSize));
            dis.close();
            bis.close();
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 0 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 3 ? 1 : 0) != 0);
            KeyValueWriter streamOutputWriter = (KeyValueWriter)((LogicalOutput)this.getOutputs().get(JoinDataGen.STREAM_OUTPUT_NAME)).getWriter();
            KeyValueWriter hashOutputWriter = (KeyValueWriter)((LogicalOutput)this.getOutputs().get(JoinDataGen.HASH_OUTPUT_NAME)).getWriter();
            KeyValueWriter expectedOutputWriter = (KeyValueWriter)((LogicalOutput)this.getOutputs().get(JoinDataGen.EXPECTED_OUTPUT_NAME)).getWriter();
            float fileSizeFraction = (float)this.hashOutputFileSize / (float)this.streamOutputFileSize;
            Preconditions.checkState((fileSizeFraction > 0.0f && fileSizeFraction <= 1.0f ? 1 : 0) != 0);
            int mod = 1;
            int extraKeysMod = 0;
            if (fileSizeFraction > this.overlapApprox) {
                mod = (int)(1.0f / this.overlapApprox);
                extraKeysMod = (int)(1.0f / (fileSizeFraction - this.overlapApprox));
            } else {
                mod = (int)(1.0f / fileSizeFraction);
            }
            LOG.info((Object)("Using mod=" + mod + ", extraKeysMod=" + extraKeysMod));
            long count = 0L;
            long sizeLarge = 0L;
            long sizeSmall = 0L;
            long numLargeFileKeys = 0L;
            long numSmallFileKeys = 0L;
            long numExpectedKeys = 0L;
            while (sizeLarge < this.streamOutputFileSize) {
                String str = this.createOverlapString(13, count);
                Text text = new Text(str);
                int size = text.getLength();
                streamOutputWriter.write((Object)text, (Object)NullWritable.get());
                sizeLarge += (long)size;
                ++numLargeFileKeys;
                if (count % (long)mod == 0L) {
                    hashOutputWriter.write((Object)text, (Object)NullWritable.get());
                    sizeSmall += (long)size;
                    ++numSmallFileKeys;
                    expectedOutputWriter.write((Object)text, (Object)NullWritable.get());
                    ++numExpectedKeys;
                }
                if (extraKeysMod != 0 && count % (long)extraKeysMod == 0L) {
                    String nStr = this.createNonOverlaptring(13, count);
                    Text nText = new Text(nStr);
                    hashOutputWriter.write((Object)nText, (Object)NullWritable.get());
                    sizeSmall += (long)nText.getLength();
                    ++numSmallFileKeys;
                }
                ++count;
            }
            LOG.info((Object)("OutputStats: largeFileNumKeys=" + numLargeFileKeys + ", smallFileNumKeys=" + numSmallFileKeys + ", expFileNumKeys=" + numExpectedKeys + ", largeFileSize=" + sizeLarge + ", smallFileSize=" + sizeSmall));
        }

        private String createOverlapString(int size, long count) {
            StringBuilder sb = new StringBuilder();
            Random random = new Random();
            for (int i = 0; i < size; ++i) {
                int r = Math.abs(random.nextInt()) % 26;
                sb.append((char)(97 + r));
            }
            sb.append("_").append(this.getContext().getTaskIndex()).append("_").append(count);
            return sb.toString();
        }

        private String createNonOverlaptring(int size, long count) {
            StringBuilder sb = new StringBuilder();
            Random random = new Random();
            for (int i = 0; i < size; ++i) {
                int r = Math.abs(random.nextInt()) % 26;
                sb.append((char)(65 + r));
            }
            sb.append("_").append(this.getContext().getTaskIndex()).append("_").append(count);
            return sb.toString();
        }
    }
}

