package org.apache.pig.backend.hadoop.executionengine.spark.plan;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.GetMemNumRows;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.class */
public class SparkCompiler extends PhyPlanVisitor {
    private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
    private PigContext pigContext;
    private Properties pigProperties;
    private PhysicalPlan physicalPlan;
    private SparkOperPlan sparkPlan;
    private SparkOperator curSparkOp;
    private String scope;
    private SparkOperator[] compiledInputs;
    private Map<OperatorKey, SparkOperator> splitsSeen;
    private NodeIdGenerator nig;
    private Map<PhysicalOperator, SparkOperator> phyToSparkOpMap;
    private UDFFinder udfFinder;

    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler$FindKeyTypeVisitor.class */
    private static class FindKeyTypeVisitor extends PhyPlanVisitor {
        byte keyType;

        FindKeyTypeVisitor(PhysicalPlan physicalPlan) {
            super(physicalPlan, new DepthFirstWalker(physicalPlan));
            this.keyType = (byte) 0;
        }

        @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
        public void visitProject(POProject pOProject) throws VisitorException {
            this.keyType = pOProject.getResultType();
        }
    }

    public SparkCompiler(PhysicalPlan physicalPlan, PigContext pigContext) {
        super(physicalPlan, new DepthFirstWalker(physicalPlan));
        this.compiledInputs = null;
        this.physicalPlan = physicalPlan;
        this.pigContext = pigContext;
        this.pigProperties = pigContext.getProperties();
        this.sparkPlan = new SparkOperPlan();
        this.phyToSparkOpMap = new HashMap();
        this.udfFinder = new UDFFinder();
        this.nig = NodeIdGenerator.getGenerator();
        this.splitsSeen = new HashMap();
    }

    public void compile() throws IOException, PlanException, VisitorException {
        ArrayList<PhysicalOperator> arrayList;
        List<PhysicalOperator> roots = this.physicalPlan.getRoots();
        if (roots == null || roots.size() <= 0) {
            throw new SparkCompilerException("Internal error. Did not find roots in the physical physicalPlan.", 2053, (byte) 4);
        }
        this.scope = roots.get(0).getOperatorKey().getScope();
        List<PhysicalOperator> leaves = this.physicalPlan.getLeaves();
        if (!this.pigContext.inIllustrator) {
            for (PhysicalOperator physicalOperator : leaves) {
                if (!(physicalOperator instanceof POStore)) {
                    throw new SparkCompilerException("Expected leaf of reduce physicalPlan to always be POStore. Found " + physicalOperator.getClass().getSimpleName(), 2025, (byte) 4);
                }
            }
        }
        LinkedList physicalOperators = PlanHelper.getPhysicalOperators(this.physicalPlan, POStore.class);
        LinkedList physicalOperators2 = PlanHelper.getPhysicalOperators(this.physicalPlan, PONative.class);
        if (this.pigContext.inIllustrator) {
            arrayList = new ArrayList(leaves.size() + physicalOperators2.size());
            arrayList.addAll(leaves);
        } else {
            arrayList = new ArrayList(physicalOperators.size() + physicalOperators2.size());
            arrayList.addAll(physicalOperators);
        }
        arrayList.addAll(physicalOperators2);
        Collections.sort(arrayList);
        for (PhysicalOperator physicalOperator2 : arrayList) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Starting compile of leaf-level operator " + physicalOperator2);
            }
            compile(physicalOperator2);
        }
    }

    private void compile(PhysicalOperator physicalOperator) throws IOException, PlanException, VisitorException {
        SparkOperator[] sparkOperatorArr = this.compiledInputs;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Compiling physical operator " + physicalOperator + ". Current spark operator is " + this.curSparkOp);
        }
        List<PhysicalOperator> predecessors = this.physicalPlan.getPredecessors(physicalOperator);
        if (!(physicalOperator instanceof PONative)) {
            if (predecessors == null || predecessors.size() <= 0) {
                this.curSparkOp = getSparkOp();
                this.curSparkOp.add(physicalOperator);
                if (physicalOperator != null && (physicalOperator instanceof POLoad) && ((POLoad) physicalOperator).getLFile() != null && ((POLoad) physicalOperator).getLFile().getFuncSpec() != null) {
                    this.curSparkOp.UDFs.add(((POLoad) physicalOperator).getLFile().getFuncSpec().toString());
                }
                this.sparkPlan.add(this.curSparkOp);
                this.phyToSparkOpMap.put(physicalOperator, this.curSparkOp);
                return;
            }
            if (physicalOperator instanceof POLoad) {
                if (predecessors.size() != 1) {
                    throw new PlanException("Expected at most one predecessor of load. Got " + predecessors.size(), 2125, (byte) 4);
                }
                PhysicalOperator physicalOperator2 = predecessors.get(0);
                if (!(physicalOperator2 instanceof POStore) && !(physicalOperator2 instanceof PONative)) {
                    throw new PlanException("Predecessor of load should be a store or spark operator. Got " + physicalOperator2.getClass(), 2126, (byte) 4);
                }
                SparkOperator sparkOperator = this.phyToSparkOpMap.get(physicalOperator2);
                this.curSparkOp = getSparkOp();
                this.curSparkOp.add(physicalOperator);
                this.sparkPlan.add(this.curSparkOp);
                this.physicalPlan.disconnect(physicalOperator, physicalOperator2);
                this.sparkPlan.connect(sparkOperator, this.curSparkOp);
                this.phyToSparkOpMap.put(physicalOperator, this.curSparkOp);
                return;
            }
            Collections.sort(predecessors);
            this.compiledInputs = new SparkOperator[predecessors.size()];
            int i = -1;
            for (PhysicalOperator physicalOperator3 : predecessors) {
                if ((physicalOperator3 instanceof POSplit) && this.splitsSeen.containsKey(physicalOperator3.getOperatorKey())) {
                    i++;
                    this.compiledInputs[i] = startNew(((POSplit) physicalOperator3).getSplitStore(), this.splitsSeen.get(physicalOperator3.getOperatorKey()), null);
                } else {
                    compile(physicalOperator3);
                    i++;
                    this.compiledInputs[i] = this.curSparkOp;
                }
            }
        }
        physicalOperator.visit((PhyPlanVisitor) this);
        this.compiledInputs = sparkOperatorArr;
    }

    private SparkOperator getSparkOp() {
        SparkOperator sparkOperator = new SparkOperator(OperatorKey.genOpKey(this.scope));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created new Spark operator " + sparkOperator);
        }
        return sparkOperator;
    }

    public SparkOperPlan getSparkPlan() {
        return this.sparkPlan;
    }

    public void connectSoftLink() throws PlanException, IOException {
        Iterator<PhysicalOperator> it = this.physicalPlan.iterator();
        while (it.hasNext()) {
            PhysicalOperator next = it.next();
            if (this.physicalPlan.getSoftLinkPredecessors(next) != null) {
                Iterator<PhysicalOperator> it2 = this.physicalPlan.getSoftLinkPredecessors(next).iterator();
                while (it2.hasNext()) {
                    SparkOperator sparkOperator = this.phyToSparkOpMap.get(it2.next());
                    SparkOperator sparkOperator2 = this.phyToSparkOpMap.get(next);
                    if (sparkOperator != sparkOperator2 && (this.sparkPlan.getPredecessors(sparkOperator2) == null || !this.sparkPlan.getPredecessors(sparkOperator2).contains(sparkOperator))) {
                        this.sparkPlan.connect(sparkOperator, sparkOperator2);
                    }
                }
            }
        }
    }

    private SparkOperator startNew(FileSpec fileSpec, SparkOperator sparkOperator, OperatorKey operatorKey) throws PlanException {
        POLoad load = getLoad(operatorKey);
        load.setLFile(fileSpec);
        SparkOperator sparkOp = getSparkOp();
        sparkOp.add(load);
        this.sparkPlan.add(sparkOp);
        this.sparkPlan.connect(sparkOperator, sparkOp);
        return sparkOp;
    }

    private POLoad getLoad(OperatorKey operatorKey) {
        POLoad pOLoad = operatorKey != null ? new POLoad(operatorKey) : new POLoad(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pOLoad.setPc(this.pigContext);
        pOLoad.setIsTmpLoad(true);
        return pOLoad;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitSplit(POSplit pOSplit) throws VisitorException {
        try {
            List<PhysicalOperator> predecessors = this.physicalPlan.getPredecessors(pOSplit);
            OperatorKey operatorKey = null;
            if (predecessors != null && predecessors.size() > 0) {
                operatorKey = predecessors.get(0).getOperatorKey();
            }
            FileSpec splitStore = pOSplit.getSplitStore();
            SparkOperator endSingleInputPlanWithStr = endSingleInputPlanWithStr(splitStore);
            endSingleInputPlanWithStr.setSplitter(true);
            this.splitsSeen.put(pOSplit.getOperatorKey(), endSingleInputPlanWithStr);
            this.curSparkOp = startNew(splitStore, endSingleInputPlanWithStr, operatorKey);
            this.phyToSparkOpMap.put(pOSplit, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOSplit.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitDistinct(PODistinct pODistinct) throws VisitorException {
        try {
            addToPlan(pODistinct);
            this.phyToSparkOpMap.put(pODistinct, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pODistinct.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    private SparkOperator endSingleInputPlanWithStr(FileSpec fileSpec) throws PlanException {
        if (this.compiledInputs.length > 1) {
            throw new PlanException("Received a multi input physicalPlan when expecting only a single input one.", 2023, (byte) 4);
        }
        SparkOperator sparkOperator = this.compiledInputs[0];
        POStore store = getStore();
        store.setSFile(fileSpec);
        sparkOperator.physicalPlan.addAsLeaf(store);
        return sparkOperator;
    }

    private POStore getStore() {
        POStore pOStore = new POStore(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pOStore.setIsTmpStore(true);
        return pOStore;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitLoad(POLoad pOLoad) throws VisitorException {
        try {
            addToPlan(pOLoad);
            this.phyToSparkOpMap.put(pOLoad, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOLoad.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitNative(PONative pONative) throws VisitorException {
        try {
            NativeSparkOperator nativeSparkOp = getNativeSparkOp(pONative.getNativeMRjar(), pONative.getParams());
            nativeSparkOp.markNative();
            this.sparkPlan.add(nativeSparkOp);
            this.sparkPlan.connect(this.curSparkOp, nativeSparkOp);
            this.phyToSparkOpMap.put(pONative, nativeSparkOp);
            this.curSparkOp = nativeSparkOp;
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pONative.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    private NativeSparkOperator getNativeSparkOp(String str, String[] strArr) {
        return new NativeSparkOperator(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), str, strArr);
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitStore(POStore pOStore) throws VisitorException {
        try {
            addToPlan(pOStore);
            this.phyToSparkOpMap.put(pOStore, this.curSparkOp);
            if (pOStore.getSFile() != null && pOStore.getSFile().getFuncSpec() != null) {
                this.curSparkOp.UDFs.add(pOStore.getSFile().getFuncSpec().toString());
            }
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOStore.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitFilter(POFilter pOFilter) throws VisitorException {
        try {
            addToPlan(pOFilter);
            processUDFs(pOFilter.getPlan());
            this.phyToSparkOpMap.put(pOFilter, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOFilter.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitCross(POCross pOCross) throws VisitorException {
        try {
            addToPlan(pOCross);
            this.phyToSparkOpMap.put(pOCross, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOCross.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitStream(POStream pOStream) throws VisitorException {
        try {
            addToPlan(pOStream);
            this.phyToSparkOpMap.put(pOStream, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOStream.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitSort(POSort pOSort) throws VisitorException {
        try {
            addToPlan(pOSort);
            long limit = pOSort.getLimit();
            if (limit != -1) {
                POLimit pOLimit = new POLimit(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
                pOLimit.setLimit(limit);
                this.curSparkOp.physicalPlan.addAsLeaf(pOLimit);
                this.curSparkOp.markLimitAfterSort();
            }
            this.phyToSparkOpMap.put(pOSort, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOSort.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitLimit(POLimit pOLimit) throws VisitorException {
        try {
            addToPlan(pOLimit);
            this.curSparkOp.markLimit();
            this.phyToSparkOpMap.put(pOLimit, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOLimit.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitLocalRearrange(POLocalRearrange pOLocalRearrange) throws VisitorException {
        try {
            addToPlan(pOLocalRearrange);
            List<PhysicalPlan> plans = pOLocalRearrange.getPlans();
            if (plans != null) {
                Iterator<PhysicalPlan> it = plans.iterator();
                while (it.hasNext()) {
                    processUDFs(it.next());
                }
            }
            this.phyToSparkOpMap.put(pOLocalRearrange, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOLocalRearrange.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitCollectedGroup(POCollectedGroup pOCollectedGroup) throws VisitorException {
        List<PhysicalOperator> roots = this.curSparkOp.physicalPlan.getRoots();
        if (roots.size() != 1) {
            throw new SparkCompilerException("Expected one but found more then one root physical operator in physical physicalPlan.", 2171, (byte) 4);
        }
        PhysicalOperator physicalOperator = roots.get(0);
        if (!(physicalOperator instanceof POLoad)) {
            throw new SparkCompilerException("Expected physical operator at root to be POLoad. Found : " + physicalOperator.getClass().getCanonicalName(), 2172, (byte) 4);
        }
        Object loadFunc = ((POLoad) physicalOperator).getLoadFunc();
        try {
            if (!CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) {
                throw new SparkCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", 2249);
            }
            ((CollectableLoadFunc) loadFunc).ensureAllKeyInstancesInSameSplit();
            try {
                addToPlan(pOCollectedGroup);
                this.phyToSparkOpMap.put(pOCollectedGroup, this.curSparkOp);
            } catch (Exception e) {
                throw new SparkCompilerException("Error compiling operator " + pOCollectedGroup.getClass().getSimpleName(), 2034, (byte) 4, e);
            }
        } catch (SparkCompilerException e2) {
            throw e2;
        } catch (IOException e3) {
            throw new SparkCompilerException("Error compiling operator " + pOCollectedGroup.getClass().getSimpleName(), 2034, (byte) 4, e3);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitPOForEach(POForEach pOForEach) throws VisitorException {
        try {
            addToPlan(pOForEach);
            List<PhysicalPlan> inputPlans = pOForEach.getInputPlans();
            if (inputPlans != null) {
                Iterator<PhysicalPlan> it = inputPlans.iterator();
                while (it.hasNext()) {
                    processUDFs(it.next());
                }
            }
            this.phyToSparkOpMap.put(pOForEach, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOForEach.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitCounter(POCounter pOCounter) throws VisitorException {
        try {
            addToPlan(pOCounter);
            this.phyToSparkOpMap.put(pOCounter, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOCounter.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitRank(PORank pORank) throws VisitorException {
        try {
            addToPlan(pORank);
            this.phyToSparkOpMap.put(pORank, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pORank.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitGlobalRearrange(POGlobalRearrange pOGlobalRearrange) throws VisitorException {
        try {
            addToPlan(new POGlobalRearrangeSpark(pOGlobalRearrange));
            if (pOGlobalRearrange.isCross()) {
                this.curSparkOp.addCrossKey(pOGlobalRearrange.getOperatorKey().toString());
            }
            this.curSparkOp.customPartitioner = pOGlobalRearrange.getCustomPartitioner();
            this.phyToSparkOpMap.put(pOGlobalRearrange, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOGlobalRearrange.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitPackage(POPackage pOPackage) throws VisitorException {
        try {
            addToPlan(pOPackage);
            this.phyToSparkOpMap.put(pOPackage, this.curSparkOp);
            if (pOPackage.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
                this.curSparkOp.markRegularJoin();
            } else if (pOPackage.getPkgr().getPackageType() == Packager.PackageType.GROUP) {
                if (pOPackage.getNumInps() == 1) {
                    this.curSparkOp.markGroupBy();
                } else if (pOPackage.getNumInps() > 1) {
                    this.curSparkOp.markCogroup();
                }
            }
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOPackage.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitUnion(POUnion pOUnion) throws VisitorException {
        try {
            addToPlan(pOUnion);
            this.phyToSparkOpMap.put(pOUnion, this.curSparkOp);
            this.curSparkOp.markUnion();
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOUnion.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitSkewedJoin(POSkewedJoin pOSkewedJoin) throws VisitorException {
        try {
            String str = "pig.keyDistFile" + new Random().nextInt();
            SparkOperator skewedJoinSampleJob = getSkewedJoinSampleJob(pOSkewedJoin);
            buildBroadcastForSkewedJoin(skewedJoinSampleJob, str);
            skewedJoinSampleJob.markSampler();
            this.sparkPlan.add(skewedJoinSampleJob);
            addToPlan(pOSkewedJoin);
            this.curSparkOp.setSkewedJoinPartitionFile(str);
            this.sparkPlan.connect(skewedJoinSampleJob, this.curSparkOp);
            this.phyToSparkOpMap.put(pOSkewedJoin, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOSkewedJoin.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitFRJoin(POFRJoin pOFRJoin) throws VisitorException {
        try {
            this.curSparkOp = this.phyToSparkOpMap.get(pOFRJoin.getInputs().get(pOFRJoin.getFragment()));
            for (int i = 0; i < this.compiledInputs.length; i++) {
                SparkOperator sparkOperator = this.compiledInputs[i];
                if (!this.curSparkOp.equals(sparkOperator)) {
                    OperatorKey operatorKey = new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope));
                    POBroadcastSpark pOBroadcastSpark = new POBroadcastSpark(operatorKey);
                    pOBroadcastSpark.setBroadcastedVariableName(operatorKey.toString());
                    sparkOperator.physicalPlan.addAsLeaf(pOBroadcastSpark);
                }
            }
            addToPlan(new POFRJoinSpark(pOFRJoin));
            this.phyToSparkOpMap.put(pOFRJoin, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOFRJoin.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitMergeJoin(POMergeJoin pOMergeJoin) throws VisitorException {
        PhysicalPlan physicalPlan;
        try {
            if (this.compiledInputs.length != 2 || pOMergeJoin.getInputs().size() != 2) {
                throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : " + this.compiledInputs.length, 1101);
            }
            this.curSparkOp = this.phyToSparkOpMap.get(pOMergeJoin.getInputs().get(0));
            SparkOperator sparkOperator = this.curSparkOp.equals(this.compiledInputs[0]) ? this.compiledInputs[1] : this.compiledInputs[0];
            PhysicalPlan physicalPlan2 = sparkOperator.physicalPlan;
            if (physicalPlan2.getRoots().size() != 1) {
                throw new SparkCompilerException("Expected one but found more then one root physical operator in physical plan.", 2171);
            }
            PhysicalOperator physicalOperator = physicalPlan2.getRoots().get(0);
            if (!(physicalOperator instanceof POLoad)) {
                throw new SparkCompilerException("Expected physical operator at root to be POLoad. Found : " + physicalOperator.getClass().getCanonicalName(), 2172);
            }
            if (physicalPlan2.getSuccessors(physicalOperator) == null || physicalPlan2.getSuccessors(physicalOperator).isEmpty()) {
                physicalPlan = null;
            } else {
                physicalPlan = physicalPlan2.m92clone();
                PhysicalOperator physicalOperator2 = physicalPlan.getRoots().get(0);
                physicalPlan.disconnect(physicalOperator2, physicalPlan.getSuccessors(physicalOperator2).get(0));
                physicalPlan.remove(physicalOperator2);
                physicalPlan2.trimBelow((PhysicalPlan) physicalOperator);
            }
            pOMergeJoin.setupRightPipeline(physicalPlan);
            sparkOperator.setRequestedParallelism(1);
            POLoad pOLoad = (POLoad) sparkOperator.physicalPlan.getRoots().get(0);
            pOMergeJoin.setSignature(pOLoad.getSignature());
            LoadFunc loadFunc = pOLoad.getLoadFunc();
            if (IndexableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) {
                pOMergeJoin.setRightLoaderFuncSpec(pOLoad.getLFile().getFuncSpec());
                pOMergeJoin.setRightInputFileName(pOLoad.getLFile().getFileName());
                this.curSparkOp.UDFs.add(pOLoad.getLFile().getFuncSpec().toString());
                this.sparkPlan.remove(sparkOperator);
                if (sparkOperator == this.compiledInputs[0]) {
                    this.compiledInputs[0] = null;
                } else if (sparkOperator == this.compiledInputs[1]) {
                    this.compiledInputs[1] = null;
                }
                int size = ((PhysicalPlan) this.mPlan).getPredecessors(pOMergeJoin).size();
                for (int i = 0; i < size; i++) {
                    Iterator<PhysicalPlan> it = pOMergeJoin.getInnerPlansOf(i).iterator();
                    while (it.hasNext()) {
                        Iterator<PhysicalOperator> it2 = it.next().iterator();
                        while (it2.hasNext()) {
                            if (!(it2.next() instanceof POProject)) {
                                throw new SparkCompilerException("Merge join is possible only for simple column or '*' join keys when using " + pOLoad.getLFile().getFuncSpec() + " as the loader", 1106, (byte) 2);
                            }
                        }
                    }
                }
            } else {
                if (pOMergeJoin.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
                    throw new SparkCompilerException("Right input of merge-join must implement IndexableLoadFunc. The specified loader " + loadFunc + " doesn't implement it", 1104);
                }
                if (!OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass())) {
                    throw new SparkCompilerException("Right input of merge-join must implement OrderedLoadFunc interface. The specified loader " + loadFunc + " doesn't implement it", 1104);
                }
                List<PhysicalPlan> innerPlansOf = pOMergeJoin.getInnerPlansOf(1);
                FileSpec lFile = pOLoad.getLFile();
                pOLoad.setLFile(new FileSpec(pOLoad.getLFile().getFileName(), new FuncSpec(MergeJoinIndexer.class.getName(), new String[]{lFile.getFuncSpec().toString(), ObjectSerializer.serialize((Serializable) innerPlansOf), ObjectSerializer.serialize(physicalPlan), pOLoad.getSignature(), pOLoad.getOperatorKey().scope, Boolean.toString(true)})));
                sparkOperator.useTypedComparator(true);
                POStore store = getStore();
                FileSpec tempFileSpec = getTempFileSpec();
                store.setSFile(tempFileSpec);
                sparkOperator.physicalPlan.addAsLeaf(store);
                sparkOperator.markIndexer();
                this.curSparkOp.UDFs.add(lFile.getFuncSpec().toString());
                this.sparkPlan.connect(sparkOperator, this.curSparkOp);
                pOMergeJoin.setRightLoaderFuncSpec(new FuncSpec(DefaultIndexableLoader.class.getName(), new String[]{lFile.getFuncSpec().toString(), tempFileSpec.getFileName(), tempFileSpec.getFuncSpec().toString(), pOMergeJoin.getOperatorKey().scope, lFile.getFileName()}));
                pOMergeJoin.setRightInputFileName(lFile.getFileName());
                pOMergeJoin.setIndexFile(tempFileSpec.getFileName());
            }
            this.curSparkOp.physicalPlan.addAsLeaf(pOMergeJoin);
            this.phyToSparkOpMap.put(pOMergeJoin, this.curSparkOp);
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOMergeJoin.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }

    private void processUDFs(PhysicalPlan physicalPlan) throws VisitorException {
        if (physicalPlan != null) {
            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(physicalPlan);
            scalarPhyFinder.visit();
            this.curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
            this.udfFinder.setPlan(physicalPlan);
            this.udfFinder.visit();
            this.curSparkOp.UDFs.addAll(this.udfFinder.getUDFs());
        }
    }

    private void addToPlan(PhysicalOperator physicalOperator) throws PlanException, IOException {
        SparkOperator merge = this.compiledInputs.length == 1 ? this.compiledInputs[0] : merge(this.compiledInputs);
        merge.physicalPlan.addAsLeaf(physicalOperator);
        this.curSparkOp = merge;
    }

    private SparkOperator merge(SparkOperator[] sparkOperatorArr) throws PlanException {
        SparkOperator sparkOp = getSparkOp();
        this.sparkPlan.add(sparkOp);
        HashSet hashSet = new HashSet();
        ArrayList<SparkOperator> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (SparkOperator sparkOperator : sparkOperatorArr) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Merging Spark operator" + sparkOperator);
            }
            arrayList.add(sparkOperator);
            arrayList2.add(sparkOperator.physicalPlan);
            List<SparkOperator> predecessors = this.sparkPlan.getPredecessors(sparkOperator);
            if (predecessors != null) {
                Iterator<SparkOperator> it = predecessors.iterator();
                while (it.hasNext()) {
                    hashSet.add(it.next());
                }
            }
        }
        merge(sparkOp.physicalPlan, arrayList2);
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            this.sparkPlan.connect((SparkOperator) it2.next(), sparkOp);
        }
        for (SparkOperator sparkOperator2 : arrayList) {
            if (sparkOperator2.requestedParallelism > sparkOp.requestedParallelism) {
                sparkOp.requestedParallelism = sparkOperator2.requestedParallelism;
            }
            for (String str : sparkOperator2.UDFs) {
                if (!sparkOp.UDFs.contains(str)) {
                    sparkOp.UDFs.add(str);
                }
            }
            for (PhysicalOperator physicalOperator : sparkOperator2.scalars) {
                if (!sparkOp.scalars.contains(physicalOperator)) {
                    sparkOp.scalars.add(physicalOperator);
                }
            }
            if (sparkOperator2.getCrossKeys() != null) {
                Iterator<String> it3 = sparkOperator2.getCrossKeys().iterator();
                while (it3.hasNext()) {
                    sparkOp.addCrossKey(it3.next());
                }
            }
            HashSet hashSet2 = new HashSet();
            for (Map.Entry<PhysicalOperator, SparkOperator> entry : this.phyToSparkOpMap.entrySet()) {
                if (entry.getValue() == sparkOperator2) {
                    hashSet2.add(entry.getKey());
                }
            }
            Iterator it4 = hashSet2.iterator();
            while (it4.hasNext()) {
                this.phyToSparkOpMap.put((PhysicalOperator) it4.next(), sparkOp);
            }
            this.sparkPlan.remove(sparkOperator2);
        }
        return sparkOp;
    }

    private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(E e, List<E> list) throws PlanException {
        Iterator<E> it = list.iterator();
        while (it.hasNext()) {
            e.merge(it.next());
        }
        Collections.sort(e.getLeaves());
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
    public void visitMergeCoGroup(POMergeCogroup pOMergeCogroup) throws VisitorException {
        if (this.compiledInputs.length < 2) {
            throw new SparkCompilerException("Merge Cogroup work on two or more relations.To use map-side group-by on single relation, use 'collected' qualifier.", 2251);
        }
        ArrayList arrayList = new ArrayList(this.compiledInputs.length - 1);
        ArrayList arrayList2 = new ArrayList(this.compiledInputs.length - 1);
        ArrayList arrayList3 = new ArrayList(this.compiledInputs.length - 1);
        try {
            pOMergeCogroup.setEndOfRecordMark((byte) 1);
            for (int i = 0; i < this.compiledInputs.length; i++) {
                SparkOperator sparkOperator = this.compiledInputs[i];
                PhysicalPlan physicalPlan = sparkOperator.physicalPlan;
                if (physicalPlan.getRoots().size() != 1) {
                    throw new SparkCompilerException("Expected one but found more then one root physical operator in physical plan.", 2171, (byte) 4);
                }
                PhysicalOperator physicalOperator = physicalPlan.getRoots().get(0);
                if (!(physicalOperator instanceof POLoad)) {
                    throw new SparkCompilerException("Expected physical operator at root to be POLoad. Found : " + physicalOperator.getClass().getCanonicalName(), 2172);
                }
                POLoad pOLoad = (POLoad) physicalOperator;
                FileSpec lFile = pOLoad.getLFile();
                FuncSpec funcSpec = lFile.getFuncSpec();
                Object loadFunc = pOLoad.getLoadFunc();
                if (i == 0) {
                    if (!CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) {
                        throw new SparkCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", 2252);
                    }
                    ((CollectableLoadFunc) loadFunc).ensureAllKeyInstancesInSameSplit();
                } else {
                    if (!IndexableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) {
                        throw new SparkCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", 2253);
                    }
                    arrayList.add(funcSpec);
                    arrayList2.add(lFile.getFileName());
                    arrayList3.add(pOLoad.getSignature());
                    this.sparkPlan.remove(sparkOperator);
                }
            }
            pOMergeCogroup.setSideLoadFuncs(arrayList);
            pOMergeCogroup.setSideFileSpecs(arrayList2);
            pOMergeCogroup.setLoaderSignatures(arrayList3);
            SparkOperator sparkOperator2 = this.phyToSparkOpMap.get(pOMergeCogroup.getInputs().get(0));
            SparkOperator sparkOp = getSparkOp();
            FileSpec indexingJob = getIndexingJob(sparkOp, sparkOperator2, pOMergeCogroup.getLRInnerPlansOf(0));
            pOMergeCogroup.setIdxFuncSpec(indexingJob.getFuncSpec());
            pOMergeCogroup.setIndexFileName(indexingJob.getFileName());
            sparkOperator2.physicalPlan.addAsLeaf(pOMergeCogroup);
            Iterator<FuncSpec> it = arrayList.iterator();
            while (it.hasNext()) {
                sparkOperator2.UDFs.add(it.next().toString());
            }
            this.sparkPlan.add(sparkOp);
            this.sparkPlan.connect(sparkOp, sparkOperator2);
            this.phyToSparkOpMap.put(pOMergeCogroup, sparkOperator2);
            this.curSparkOp = sparkOperator2;
        } catch (CloneNotSupportedException e) {
            throw new SparkCompilerException(e);
        } catch (ExecException e2) {
            throw new SparkCompilerException(e2.getDetailedMessage(), e2.getErrorCode(), e2.getErrorSource(), e2);
        } catch (SparkCompilerException e3) {
            throw e3;
        } catch (PlanException e4) {
            throw new SparkCompilerException("Error compiling operator " + pOMergeCogroup.getClass().getCanonicalName(), 2034, (byte) 4, e4);
        } catch (IOException e5) {
            throw new SparkCompilerException("IOException caught while compiling POMergeCoGroup", 3000, e5);
        }
    }

    private FileSpec getIndexingJob(SparkOperator sparkOperator, SparkOperator sparkOperator2, List<PhysicalPlan> list) throws SparkCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
        PhysicalPlan physicalPlan;
        PhysicalPlan physicalPlan2 = sparkOperator2.physicalPlan;
        POLoad pOLoad = (POLoad) physicalPlan2.getRoots().get(0);
        FileSpec lFile = pOLoad.getLFile();
        FuncSpec funcSpec = lFile.getFuncSpec();
        if (!OrderedLoadFunc.class.isAssignableFrom(pOLoad.getLoadFunc().getClass())) {
            throw new SparkCompilerException("Base relation of merge-coGroup must implement OrderedLoadFunc interface. The specified loader " + funcSpec + " doesn't implement it", 1104);
        }
        String[] strArr = new String[6];
        strArr[0] = funcSpec.toString();
        strArr[1] = ObjectSerializer.serialize((Serializable) list);
        strArr[3] = pOLoad.getSignature();
        strArr[4] = pOLoad.getOperatorKey().scope;
        strArr[5] = Boolean.toString(false);
        if (physicalPlan2.getSuccessors(pOLoad) == null || physicalPlan2.getSuccessors(pOLoad).isEmpty()) {
            physicalPlan = null;
        } else {
            physicalPlan = physicalPlan2.m92clone();
            PhysicalOperator physicalOperator = physicalPlan.getRoots().get(0);
            physicalPlan.disconnect(physicalOperator, physicalPlan.getSuccessors(physicalOperator).get(0));
            physicalPlan.remove(physicalOperator);
        }
        strArr[2] = ObjectSerializer.serialize(physicalPlan);
        POLoad load = getLoad(null);
        load.setLFile(new FileSpec(lFile.getFileName(), new FuncSpec(MergeJoinIndexer.class.getName(), strArr)));
        sparkOperator.physicalPlan.add(load);
        sparkOperator.UDFs.add(pOLoad.getLFile().getFuncSpec().toString());
        SparkUtil.createIndexerSparkNode(sparkOperator, this.scope, this.nig);
        POStore store = getStore();
        FileSpec tempFileSpec = getTempFileSpec();
        store.setSFile(tempFileSpec);
        sparkOperator.physicalPlan.addAsLeaf(store);
        return tempFileSpec;
    }

    private FileSpec getTempFileSpec() throws IOException {
        return new FileSpec(FileLocalizer.getTemporaryPath(this.pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(this.pigContext)));
    }

    private void addSampleOperatorForSkewedJoin(SparkOperator sparkOperator) throws PlanException {
        Configuration configuration = ConfigurationUtil.toConfiguration(this.pigProperties);
        sparkOperator.physicalPlan.addAsLeaf(new POPoissonSampleSpark(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, configuration.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, 17), configuration.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, 0.3f), configuration.getLong(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1L)));
    }

    private SparkOperator getSortJob(POSort pOSort, SparkOperator sparkOperator, FileSpec fileSpec, FileSpec fileSpec2, int i, Pair<POProject, Byte>[] pairArr) throws PlanException {
        SparkOperator startNew = startNew(fileSpec, sparkOperator, null);
        ArrayList arrayList = new ArrayList();
        byte b = 0;
        if (pairArr == null) {
            PhysicalPlan physicalPlan = new PhysicalPlan();
            POProject pOProject = new POProject(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            pOProject.setStar(true);
            pOProject.setOverloaded(false);
            pOProject.setResultType((byte) 110);
            physicalPlan.add(pOProject);
            arrayList.add(physicalPlan);
        } else {
            arrayList.addAll(pOSort.getSortPlans());
            try {
                FindKeyTypeVisitor findKeyTypeVisitor = new FindKeyTypeVisitor(pOSort.getSortPlans().get(0));
                findKeyTypeVisitor.visit();
                b = findKeyTypeVisitor.keyType;
            } catch (VisitorException e) {
                throw new PlanException("Internal error. Could not compute key type of sort operator.", 2035, (byte) 4, e);
            }
        }
        POLocalRearrange pOLocalRearrange = new POLocalRearrange(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        try {
            pOLocalRearrange.setIndex(0);
            pOLocalRearrange.setKeyType((pairArr == null || pairArr.length > 1) ? (byte) 110 : b);
            pOLocalRearrange.setPlans(arrayList);
            pOLocalRearrange.setResultType((byte) 110);
            pOLocalRearrange.addOriginalLocation(pOSort.getAlias(), pOSort.getOriginalLocations());
            startNew.physicalPlan.addAsLeaf(pOLocalRearrange);
            startNew.setGlobalSort(true);
            this.pigContext.getProperties().setProperty("pig.reduce.keytype", Byte.toString(pOLocalRearrange.getKeyType()));
            startNew.requestedParallelism = i;
            startNew.physicalPlan.addAsLeaf(pOSort);
            long limit = pOSort.getLimit();
            if (limit != -1) {
                POLimit pOLimit = new POLimit(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
                pOLimit.setLimit(limit);
                startNew.physicalPlan.addAsLeaf(pOLimit);
                startNew.markLimitAfterSort();
            }
            return startNew;
        } catch (ExecException e2) {
            throw new PlanException("Unable to set index on newly created POLocalRearrange.", 2058, (byte) 4, e2);
        }
    }

    private SparkOperator getSamplingJob(POSort pOSort, SparkOperator sparkOperator, List<PhysicalPlan> list, int i, String str, String[] strArr) throws PlanException, VisitorException, ExecException {
        addSampleOperatorForSkewedJoin(sparkOperator);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (list == null) {
            Pair<POProject, Byte>[] sortCols = getSortCols(pOSort.getSortPlans());
            if (sortCols == null) {
                PhysicalPlan physicalPlan = new PhysicalPlan();
                POProject pOProject = new POProject(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
                pOProject.setStar(true);
                pOProject.setOverloaded(false);
                pOProject.setResultType((byte) 110);
                physicalPlan.add(pOProject);
                arrayList2.add(physicalPlan);
                arrayList.add(false);
            } else {
                for (Pair<POProject, Byte> pair : sortCols) {
                    if (pair == null) {
                        throw new SparkCompilerException("Internal exception. Could not create a sampler job", 2174, (byte) 4);
                    }
                    PhysicalPlan physicalPlan2 = new PhysicalPlan();
                    try {
                        physicalPlan2.add(pair.first.clone());
                        arrayList2.add(physicalPlan2);
                        arrayList.add(false);
                    } catch (CloneNotSupportedException e) {
                        throw new AssertionError("Error cloning project caught exception" + e);
                    }
                }
            }
        } else {
            int i2 = 0;
            while (i2 < list.size()) {
                arrayList2.add(list.get(i2));
                arrayList.add(Boolean.valueOf(i2 == list.size() - 1));
                i2++;
            }
        }
        sparkOperator.physicalPlan.addAsLeaf(new POForEach(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, arrayList2, arrayList));
        sparkOperator.physicalPlan.addAsLeaf(new POSampleSortSpark(pOSort));
        PhysicalPlan physicalPlan3 = new PhysicalPlan();
        POProject pOProject2 = new POProject(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pOProject2.setColumn(1);
        pOProject2.setResultType((byte) 120);
        pOProject2.setOverloaded(true);
        physicalPlan3.add(pOProject2);
        PhysicalPlan physicalPlan4 = new PhysicalPlan();
        ConstantExpression constantExpression = new ConstantExpression(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        constantExpression.setRequestedParallelism(i);
        constantExpression.setValue(Integer.valueOf(i));
        constantExpression.setResultType((byte) 10);
        physicalPlan4.add(constantExpression);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(physicalPlan4);
        arrayList3.add(physicalPlan3);
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(false);
        arrayList4.add(false);
        sparkOperator.physicalPlan.addAsLeaf(new POForEach(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, arrayList3, arrayList4));
        PhysicalPlan physicalPlan5 = new PhysicalPlan();
        POProject pOProject3 = new POProject(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pOProject3.setResultType((byte) 110);
        pOProject3.setStar(true);
        physicalPlan5.add(pOProject3);
        ArrayList arrayList5 = new ArrayList();
        arrayList5.add(pOProject3);
        POUserFunc pOUserFunc = new POUserFunc(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, arrayList5, new FuncSpec(str, strArr));
        physicalPlan5.add(pOUserFunc);
        physicalPlan5.connect((PhysicalOperator) pOProject3, (PhysicalOperator) pOUserFunc);
        ArrayList arrayList6 = new ArrayList();
        arrayList6.add(physicalPlan5);
        ArrayList arrayList7 = new ArrayList();
        arrayList7.add(false);
        sparkOperator.physicalPlan.addAsLeaf(new POForEach(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, arrayList6, arrayList7));
        sparkOperator.requestedParallelism = 1;
        sparkOperator.markSampler();
        return sparkOperator;
    }

    private Pair<POProject, Byte>[] getSortCols(List<PhysicalPlan> list) throws PlanException, ExecException {
        POProject pOProject;
        if (list == null) {
            throw new PlanException("No expression plan found in POSort.", 2026, (byte) 4);
        }
        Pair<POProject, Byte>[] pairArr = new Pair[list.size()];
        int i = -1;
        Iterator<PhysicalPlan> it = list.iterator();
        while (it.hasNext()) {
            PhysicalOperator physicalOperator = it.next().getLeaves().get(0);
            if (!(physicalOperator instanceof POProject)) {
                pOProject = null;
            } else {
                if (((POProject) physicalOperator).isStar()) {
                    return null;
                }
                pOProject = (POProject) physicalOperator;
            }
            i++;
            pairArr[i] = new Pair<>(pOProject, Byte.valueOf(physicalOperator.getResultType()));
        }
        return pairArr;
    }

    private void buildBroadcastForSkewedJoin(SparkOperator sparkOperator, String str) throws PlanException {
        POBroadcastSpark pOBroadcastSpark = new POBroadcastSpark(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
        pOBroadcastSpark.setBroadcastedVariableName(str);
        sparkOperator.physicalPlan.addAsLeaf(pOBroadcastSpark);
    }

    private SparkOperator getSkewedJoinSampleJob(POSkewedJoin pOSkewedJoin) throws PlanException, VisitorException {
        try {
            SparkOperator sparkOperator = new SparkOperator(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            sparkOperator.physicalPlan = this.compiledInputs[0].physicalPlan.m92clone();
            List<PhysicalPlan> list = pOSkewedJoin.getJoinPlans().get(this.physicalPlan.getPredecessors(pOSkewedJoin).get(0));
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < list.size(); i++) {
                arrayList.add(false);
            }
            POSort pOSort = new POSort(pOSkewedJoin.getOperatorKey(), pOSkewedJoin.getRequestedParallelism(), null, list, arrayList, null);
            List<PhysicalPlan> arrayList2 = new ArrayList<>();
            arrayList2.addAll(list);
            POProject pOProject = new POProject(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)));
            pOProject.setResultType((byte) 110);
            pOProject.setStar(true);
            ArrayList arrayList3 = new ArrayList();
            arrayList3.add(pOProject);
            PhysicalPlan physicalPlan = new PhysicalPlan();
            POUserFunc pOUserFunc = new POUserFunc(new OperatorKey(this.scope, this.nig.getNextNodeId(this.scope)), -1, arrayList3, new FuncSpec(GetMemNumRows.class.getName(), (String[]) null));
            pOUserFunc.setResultType((byte) 110);
            physicalPlan.add(pOUserFunc);
            physicalPlan.add(pOProject);
            physicalPlan.connect((PhysicalOperator) pOProject, (PhysicalOperator) pOUserFunc);
            arrayList2.add(physicalPlan);
            return getSamplingJob(pOSort, sparkOperator, arrayList2, pOSkewedJoin.getRequestedParallelism(), PartitionSkewedKeys.class.getName(), new String[]{this.pigContext.getProperties().getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, String.valueOf(0.3f)), this.pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0")});
        } catch (Exception e) {
            throw new SparkCompilerException("Error compiling operator " + pOSkewedJoin.getClass().getSimpleName(), 2034, (byte) 4, e);
        }
    }
}
