package org.apache.pig.backend.hadoop.executionengine.spark;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.class */
public class JobGraphBuilder extends SparkOpPlanVisitor {
    private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
    public static final int NULLPART_JOB_ID = -1;
    private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap;
    private SparkPigStats sparkStats;
    private JavaSparkContext sparkContext;
    private JobMetricsListener jobMetricsListener;
    private String jobGroupID;
    private Set<Integer> seenJobIDs;
    private SparkOperPlan sparkPlan;
    private Map<OperatorKey, RDD<Tuple>> sparkOpRdds;
    private Map<OperatorKey, RDD<Tuple>> physicalOpRdds;
    private JobConf jobConf;
    private PigContext pc;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder$ParallelConstantVisitor.class */
    public static class ParallelConstantVisitor extends PhyPlanVisitor {
        private int rp;
        private boolean replaced;
        private boolean isAfterSampleOperator;

        public ParallelConstantVisitor(PhysicalPlan physicalPlan, int i) {
            super(physicalPlan, new DepthFirstWalker(physicalPlan));
            this.replaced = false;
            this.isAfterSampleOperator = false;
            this.rp = i;
        }

        @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
        public void visitConstant(ConstantExpression constantExpression) throws VisitorException {
            if (this.isAfterSampleOperator && constantExpression.getRequestedParallelism() == -1 && (constantExpression.getValue() instanceof Integer)) {
                if (this.replaced) {
                    throw new VisitorException("Invalid reduce plan: more than one ConstantExpression found in sampling job");
                }
                constantExpression.setValue(Integer.valueOf(this.rp));
                constantExpression.setRequestedParallelism(this.rp);
                this.replaced = true;
            }
        }

        @Override // org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor
        public void visitPoissonSample(POPoissonSample pOPoissonSample) {
            this.isAfterSampleOperator = true;
        }
    }

    public JobGraphBuilder(SparkOperPlan sparkOperPlan, Map<Class<? extends PhysicalOperator>, RDDConverter> map, SparkPigStats sparkPigStats, JavaSparkContext javaSparkContext, JobMetricsListener jobMetricsListener, String str, JobConf jobConf, PigContext pigContext) {
        super(sparkOperPlan, new DependencyOrderWalker(sparkOperPlan, true));
        this.convertMap = null;
        this.sparkStats = null;
        this.sparkContext = null;
        this.jobMetricsListener = null;
        this.jobGroupID = null;
        this.seenJobIDs = new HashSet();
        this.sparkPlan = null;
        this.sparkOpRdds = new HashMap();
        this.physicalOpRdds = new HashMap();
        this.jobConf = null;
        this.sparkPlan = sparkOperPlan;
        this.convertMap = map;
        this.sparkStats = sparkPigStats;
        this.sparkContext = javaSparkContext;
        this.jobMetricsListener = jobMetricsListener;
        this.jobGroupID = str;
        this.jobConf = jobConf;
        this.pc = pigContext;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor
    public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException {
        new PhyPlanSetter(sparkOperator.physicalPlan).visit();
        try {
            setReplicationForMergeJoin(sparkOperator.physicalPlan);
            sparkOperToRDD(sparkOperator);
            finishUDFs(sparkOperator.physicalPlan);
        } catch (Exception e) {
            throw new VisitorException("fail to get the rdds of this spark operator: ", e);
        }
    }

    private void setReplicationForMergeJoin(PhysicalPlan physicalPlan) throws IOException {
        ArrayList arrayList = new ArrayList();
        LinkedList<POMergeJoin> physicalOperators = PlanHelper.getPhysicalOperators(physicalPlan, POMergeJoin.class);
        if (physicalOperators.size() > 0) {
            for (POMergeJoin pOMergeJoin : physicalOperators) {
                String indexFile = pOMergeJoin.getIndexFile();
                if (indexFile != null) {
                    arrayList.add(new Path(indexFile));
                }
                pOMergeJoin.setIndexFile(null);
            }
        }
        setReplicationForFiles(arrayList);
    }

    private void setReplicationForFiles(List<Path> list) throws IOException {
        FileSystem fileSystem = FileSystem.get(this.jobConf);
        short s = (short) this.jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
        for (int i = 0; i < list.size(); i++) {
            fileSystem.setReplication(list.get(i), s);
        }
    }

    private void finishUDFs(PhysicalPlan physicalPlan) throws VisitorException {
        try {
            new UDFFinishVisitor(physicalPlan, new DependencyOrderWalker(physicalPlan)).visit();
        } catch (VisitorException e) {
            throw new VisitorException("Error while calling finish method on UDFs.", 2121, (byte) 4, e);
        }
    }

    private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException, ExecException {
        List<SparkOperator> predecessors = this.sparkPlan.getPredecessors(sparkOperator);
        HashSet hashSet = new HashSet();
        if (predecessors != null) {
            Iterator<SparkOperator> it = predecessors.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getOperatorKey());
            }
        }
        boolean z = false;
        Exception exc = null;
        if (sparkOperator instanceof NativeSparkOperator) {
            ((NativeSparkOperator) sparkOperator).runJob();
            return;
        }
        List<PhysicalOperator> leaves = sparkOperator.physicalPlan.getLeaves();
        if (LOG.isDebugEnabled()) {
            LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size() + " leaves");
        }
        for (PhysicalOperator physicalOperator : leaves) {
            try {
                physicalToRDD(sparkOperator, sparkOperator.physicalPlan, physicalOperator, hashSet);
                this.sparkOpRdds.put(sparkOperator.getOperatorKey(), this.physicalOpRdds.get(physicalOperator.getOperatorKey()));
            } catch (Exception e) {
                LOG.error("throw exception in sparkOperToRDD: ", e);
                exc = e;
                z = true;
                if (Boolean.valueOf(this.pc.getProperties().getProperty("stop.on.failure", "false")).booleanValue()) {
                    throw new ExecException(e.getMessage(), 6017, (byte) 16);
                }
            }
        }
        LinkedList<POStore> physicalOperators = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POStore.class);
        Collections.sort(physicalOperators);
        if (physicalOperators.size() > 0) {
            int i = 0;
            if (z) {
                Iterator it2 = physicalOperators.iterator();
                while (it2.hasNext()) {
                    SparkStatsUtil.addFailJobStats(sparkOperator.name().concat("_fail"), (POStore) it2.next(), sparkOperator, this.sparkStats, exc);
                }
                return;
            }
            List<Integer> jobIDs = getJobIDs(this.seenJobIDs);
            for (POStore pOStore : physicalOperators) {
                if (jobIDs.size() == 0) {
                    if (this.physicalOpRdds.get(pOStore.getOperatorKey()).partitions().length != 0) {
                        throw new RuntimeException("Expected at least one unseen jobID  in this call to getJobIdsForGroup, but got 0");
                    }
                    this.sparkStats.addJobStats(pOStore, sparkOperator, -1, null, this.sparkContext);
                    return;
                } else {
                    int i2 = i;
                    i++;
                    SparkStatsUtil.waitForJobAddStats(jobIDs.get(i2).intValue(), pOStore, sparkOperator, this.jobMetricsListener, this.sparkContext, this.sparkStats);
                }
            }
        }
    }

    private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan physicalPlan, PhysicalOperator physicalOperator, Set<OperatorKey> set) throws IOException {
        List<PhysicalOperator> predecessors = getPredecessors(physicalPlan, physicalOperator);
        LinkedHashSet<OperatorKey> linkedHashSet = new LinkedHashSet<>();
        addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, linkedHashSet);
        if (predecessors != null) {
            for (PhysicalOperator physicalOperator2 : predecessors) {
                physicalToRDD(sparkOperator, physicalPlan, physicalOperator2, set);
                linkedHashSet.add(physicalOperator2.getOperatorKey());
            }
        } else if (set != null && set.size() > 0) {
            Iterator<OperatorKey> it = set.iterator();
            while (it.hasNext()) {
                linkedHashSet.add(it.next());
            }
        }
        if (physicalOperator instanceof POSplit) {
            for (PhysicalPlan physicalPlan2 : ((POSplit) physicalOperator).getPlans()) {
                List<PhysicalOperator> leaves = physicalPlan2.getLeaves();
                if (leaves.size() != 1) {
                    throw new RuntimeException("the size of the leaves of successorPlan should be 1");
                }
                physicalToRDD(sparkOperator, physicalPlan2, leaves.get(0), linkedHashSet);
            }
            return;
        }
        RDDConverter rDDConverter = this.convertMap.get(physicalOperator.getClass());
        if (rDDConverter == null) {
            throw new IllegalArgumentException("Pig on Spark does not support Physical Operator: " + physicalOperator);
        }
        LOG.info("Converting operator " + physicalOperator.getClass().getSimpleName() + " " + physicalOperator);
        List<RDD<Tuple>> sortPredecessorRDDs = sortPredecessorRDDs(linkedHashSet);
        if (rDDConverter instanceof FRJoinConverter) {
            setReplicatedInputs(physicalOperator, (FRJoinConverter) rDDConverter);
        }
        if (sparkOperator.isSkewedJoin() && (rDDConverter instanceof SkewedJoinConverter)) {
            ((SkewedJoinConverter) rDDConverter).setSkewedJoinPartitionFile(sparkOperator.getSkewedJoinPartitionFile());
        }
        adjustRuntimeParallelismForSkewedJoin(physicalOperator, sparkOperator, sortPredecessorRDDs);
        RDD<Tuple> convert = rDDConverter.convert(sortPredecessorRDDs, physicalOperator);
        if (convert == null) {
            throw new IllegalArgumentException("RDD should not be null after PhysicalOperator: " + physicalOperator);
        }
        this.physicalOpRdds.put(physicalOperator.getOperatorKey(), convert);
    }

    private void setReplicatedInputs(PhysicalOperator physicalOperator, FRJoinConverter fRJoinConverter) {
        HashSet hashSet = new HashSet();
        for (PhysicalOperator physicalOperator2 : physicalOperator.getInputs()) {
            if (physicalOperator2 instanceof POBroadcastSpark) {
                hashSet.add(((POBroadcastSpark) physicalOperator2).getBroadcastedVariableName());
            }
        }
        fRJoinConverter.setReplicatedInputs(hashSet);
    }

    private List<PhysicalOperator> getPredecessors(PhysicalPlan physicalPlan, PhysicalOperator physicalOperator) {
        List<PhysicalOperator> predecessors;
        if (physicalOperator instanceof POJoinGroupSpark) {
            predecessors = ((POJoinGroupSpark) physicalOperator).getPredecessors();
        } else {
            predecessors = physicalPlan.getPredecessors(physicalOperator);
            if (predecessors != null && predecessors.size() > 1 && !(physicalOperator instanceof POSkewedJoin)) {
                Collections.sort(predecessors);
            }
        }
        return predecessors;
    }

    private List<RDD<Tuple>> sortPredecessorRDDs(LinkedHashSet<OperatorKey> linkedHashSet) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<OperatorKey> it = linkedHashSet.iterator();
        while (it.hasNext()) {
            newArrayList.add(this.physicalOpRdds.get(it.next()));
        }
        return newArrayList;
    }

    private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, PhysicalOperator physicalOperator, Set<OperatorKey> set) {
        List<OperatorKey> list = sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey());
        if (list != null) {
            for (OperatorKey operatorKey : list) {
                LOG.debug(String.format("add predecessor(OperatorKey:%s) for OperatorKey:%s", operatorKey, physicalOperator.getOperatorKey()));
                set.add(operatorKey);
            }
        }
    }

    private List<Integer> getJobIDs(Set<Integer> set) {
        HashSet hashSet = new HashSet(Arrays.asList(ArrayUtils.toObject(this.sparkContext.statusTracker().getJobIdsForGroup(this.jobGroupID))));
        hashSet.removeAll(set);
        ArrayList arrayList = new ArrayList(hashSet);
        set.addAll(arrayList);
        return arrayList;
    }

    private void adjustRuntimeParallelismForSkewedJoin(PhysicalOperator physicalOperator, SparkOperator sparkOperator, List<RDD<Tuple>> list) throws VisitorException {
        if (sparkOperator.isSampler() && this.sparkPlan.getSuccessors(sparkOperator) != null && (physicalOperator instanceof POPoissonSampleSpark)) {
            SparkPigContext.get();
            new ParallelConstantVisitor(sparkOperator.physicalPlan, SparkPigContext.getParallelism(list, physicalOperator)).visit();
        }
    }
}
