package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;

import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.class */
public class SecondaryKeyOptimizerSpark extends SparkOpPlanVisitor implements SecondaryKeyOptimizer {
    private static final Log LOG = LogFactory.getLog(SecondaryKeyOptimizerSpark.class);
    private int numSortRemoved;
    private int numDistinctChanged;
    private int numUseSecondaryKey;

    public SecondaryKeyOptimizerSpark(SparkOperPlan sparkOperPlan) {
        super(sparkOperPlan, new DepthFirstWalker(sparkOperPlan));
        this.numSortRemoved = 0;
        this.numDistinctChanged = 0;
        this.numUseSecondaryKey = 0;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor
    public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException {
        LinkedList<POLocalRearrange> physicalOperators = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class);
        if (physicalOperators.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No POLocalRearranges found in the spark operator" + sparkOperator.getOperatorKey() + ". Skipping secondary key optimization.");
                return;
            }
            return;
        }
        for (POLocalRearrange pOLocalRearrange : physicalOperators) {
            try {
                PhysicalPlan mapPlan = getMapPlan(sparkOperator.physicalPlan, pOLocalRearrange);
                try {
                    PhysicalPlan reducePlan = getReducePlan(sparkOperator.physicalPlan, pOLocalRearrange);
                    List<PhysicalOperator> roots = reducePlan.getRoots();
                    if (roots.get(0) instanceof POGlobalRearrangeSpark) {
                        List<PhysicalOperator> predecessors = sparkOperator.physicalPlan.getPredecessors(roots.get(0));
                        if (predecessors != null && predecessors.size() >= 2) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Current code does not enable secondarykey optimization when  join case is encounted");
                                return;
                            }
                            return;
                        }
                    }
                    if (mapPlan.getOperator(pOLocalRearrange.getOperatorKey()) == null) {
                        mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, pOLocalRearrange.getOperatorKey());
                    }
                    SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo applySecondaryKeySort = new SparkSecondaryKeyOptimizerUtil().applySecondaryKeySort(mapPlan, reducePlan);
                    if (applySecondaryKeySort != null) {
                        this.numSortRemoved += applySecondaryKeySort.getNumSortRemoved();
                        this.numDistinctChanged += applySecondaryKeySort.getNumDistinctChanged();
                        this.numUseSecondaryKey += applySecondaryKeySort.getNumUseSecondaryKey();
                    }
                } catch (PlanException e) {
                    throw new VisitorException(e);
                }
            } catch (PlanException e2) {
                throw new VisitorException(e2);
            }
        }
    }

    private PhysicalPlan getMapPlan(PhysicalPlan physicalPlan, POLocalRearrange pOLocalRearrange) throws VisitorException, PlanException {
        PhysicalPlan physicalPlan2 = new PhysicalPlan();
        physicalPlan2.addAsRoot(pOLocalRearrange);
        List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(pOLocalRearrange);
        while (true) {
            List<PhysicalOperator> list = predecessors;
            if (list == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("there is nothing to backward search");
                }
            } else if (list.size() == 1) {
                PhysicalOperator physicalOperator = list.get(0);
                if (!(physicalOperator instanceof POLocalRearrange)) {
                    physicalPlan2.addAsRoot(physicalOperator);
                    predecessors = physicalPlan.getPredecessors(physicalOperator);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Finishing to find the mapPlan between preLR and currentLR.");
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("the size of predecessor of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
            }
        }
        return physicalPlan2;
    }

    private PhysicalPlan getReducePlan(PhysicalPlan physicalPlan, POLocalRearrange pOLocalRearrange) throws PlanException {
        PhysicalPlan physicalPlan2 = new PhysicalPlan();
        List<PhysicalOperator> successors = physicalPlan.getSuccessors(pOLocalRearrange);
        while (true) {
            List<PhysicalOperator> list = successors;
            if (list == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("there is nothing to forward search");
                }
            } else if (list.size() == 1) {
                PhysicalOperator physicalOperator = list.get(0);
                if (!(physicalOperator instanceof POLocalRearrange)) {
                    physicalPlan2.addAsLeaf(physicalOperator);
                    successors = physicalPlan.getSuccessors(physicalOperator);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Finishing to find the ReducePlan between currentLR and netxtLR.");
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("the size of successors of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
            }
        }
        return physicalPlan2;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer
    public int getNumSortRemoved() {
        return this.numSortRemoved;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer
    public int getNumDistinctChanged() {
        return this.numDistinctChanged;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer
    public int getNumUseSecondaryKey() {
        return this.numUseSecondaryKey;
    }
}
