/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.adaptive;

import java.io.Serializable;
import org.apache.spark.MapOutputStatistics;
import org.apache.spark.sql.catalyst.rules.Rule;
import org.apache.spark.sql.execution.CoalescedPartitionSpec;
import org.apache.spark.sql.execution.CoalescedPartitionSpec$;
import org.apache.spark.sql.execution.PartialReducerPartitionSpec;
import org.apache.spark.sql.execution.ShufflePartitionSpec;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec$;
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadRule;
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil$;
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec;
import org.apache.spark.sql.execution.exchange.REBALANCE_PARTITIONS_BY_COL$;
import org.apache.spark.sql.execution.exchange.REBALANCE_PARTITIONS_BY_NONE$;
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike;
import org.apache.spark.sql.execution.exchange.ShuffleOrigin;
import org.apache.spark.sql.internal.SQLConf$;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction1;

public final class OptimizeSkewInRebalancePartitions$
extends Rule<SparkPlan>
implements AQEShuffleReadRule {
    public static OptimizeSkewInRebalancePartitions$ MODULE$;
    private final Seq<ShuffleOrigin> supportedShuffleOrigins;

    static {
        new OptimizeSkewInRebalancePartitions$();
    }

    @Override
    public boolean isSupported(ShuffleExchangeLike shuffle) {
        return AQEShuffleReadRule.isSupported$(this, shuffle);
    }

    @Override
    public Seq<ShuffleOrigin> supportedShuffleOrigins() {
        return this.supportedShuffleOrigins;
    }

    private Seq<ShufflePartitionSpec> optimizeSkewedPartitions(int shuffleId, long[] bytesByPartitionId, long targetSize) {
        double smallPartitionFactor = BoxesRunTime.unboxToDouble((Object)this.conf().getConf(SQLConf$.MODULE$.ADAPTIVE_REBALANCE_PARTITIONS_SMALL_PARTITION_FACTOR()));
        return (Seq)new ArrayOps.ofLong(Predef$.MODULE$.longArrayOps(bytesByPartitionId)).indices().flatMap((Function1 & Serializable & scala.Serializable)reduceIndex -> OptimizeSkewInRebalancePartitions$.$anonfun$optimizeSkewedPartitions$1(bytesByPartitionId, targetSize, shuffleId, smallPartitionFactor, BoxesRunTime.unboxToInt((Object)reduceIndex)), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public SparkPlan org$apache$spark$sql$execution$adaptive$OptimizeSkewInRebalancePartitions$$tryOptimizeSkewedPartitions(ShuffleQueryStageExec shuffle) {
        long advisorySize = BoxesRunTime.unboxToLong((Object)this.conf().getConf(SQLConf$.MODULE$.ADVISORY_PARTITION_SIZE_IN_BYTES()));
        Option<MapOutputStatistics> mapStats = shuffle.mapStats();
        if (mapStats.isEmpty() || new ArrayOps.ofLong(Predef$.MODULE$.longArrayOps(((MapOutputStatistics)mapStats.get()).bytesByPartitionId())).forall((Function1)(JFunction1.mcZJ.sp & Serializable & scala.Serializable)x$3 -> x$3 <= advisorySize)) {
            return shuffle;
        }
        Seq<ShufflePartitionSpec> newPartitionsSpec = this.optimizeSkewedPartitions(((MapOutputStatistics)mapStats.get()).shuffleId(), ((MapOutputStatistics)mapStats.get()).bytesByPartitionId(), advisorySize);
        return newPartitionsSpec.length() == ((MapOutputStatistics)mapStats.get()).bytesByPartitionId().length ? shuffle : AQEShuffleReadExec$.MODULE$.apply(shuffle, newPartitionsSpec);
    }

    public SparkPlan apply(SparkPlan plan) {
        if (!BoxesRunTime.unboxToBoolean((Object)this.conf().getConf(SQLConf$.MODULE$.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED()))) {
            return plan;
        }
        return (SparkPlan)plan.transformUp((PartialFunction)new scala.Serializable(){
            public static final long serialVersionUID = 0L;

            public final <A1 extends SparkPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                ShuffleQueryStageExec shuffleQueryStageExec;
                A1 A1 = x1;
                Object object = A1 instanceof ShuffleQueryStageExec && OptimizeSkewInRebalancePartitions$.MODULE$.isSupported((shuffleQueryStageExec = (ShuffleQueryStageExec)A1).shuffle()) ? OptimizeSkewInRebalancePartitions$.MODULE$.org$apache$spark$sql$execution$adaptive$OptimizeSkewInRebalancePartitions$$tryOptimizeSkewedPartitions(shuffleQueryStageExec) : function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(SparkPlan x1) {
                ShuffleQueryStageExec shuffleQueryStageExec;
                SparkPlan sparkPlan = x1;
                boolean bl = sparkPlan instanceof ShuffleQueryStageExec && OptimizeSkewInRebalancePartitions$.MODULE$.isSupported((shuffleQueryStageExec = (ShuffleQueryStageExec)sparkPlan).shuffle());
                return bl;
            }
        });
    }

    public static final /* synthetic */ Seq $anonfun$optimizeSkewedPartitions$1(long[] bytesByPartitionId$1, long targetSize$1, int shuffleId$1, double smallPartitionFactor$1, int reduceIndex) {
        List list;
        long bytes = bytesByPartitionId$1[reduceIndex];
        if (bytes > targetSize$1) {
            Option<Seq<PartialReducerPartitionSpec>> newPartitionSpec = ShufflePartitionsUtil$.MODULE$.createSkewPartitionSpecs(shuffleId$1, reduceIndex, targetSize$1, smallPartitionFactor$1);
            if (newPartitionSpec.isEmpty()) {
                CoalescedPartitionSpec coalescedPartitionSpec = CoalescedPartitionSpec$.MODULE$.apply(reduceIndex, reduceIndex + 1, bytes);
                list = Nil$.MODULE$.$colon$colon((Object)coalescedPartitionSpec);
            } else {
                MODULE$.logDebug((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("For shuffle ").append(shuffleId$1).append(", partition ").append(reduceIndex).append(" is skew, ").append("split it into ").append(((SeqLike)newPartitionSpec.get()).size()).append(" parts.").toString());
                list = (Seq)newPartitionSpec.get();
            }
        } else {
            CoalescedPartitionSpec coalescedPartitionSpec = CoalescedPartitionSpec$.MODULE$.apply(reduceIndex, reduceIndex + 1, bytes);
            list = Nil$.MODULE$.$colon$colon((Object)coalescedPartitionSpec);
        }
        return list;
    }

    private OptimizeSkewInRebalancePartitions$() {
        MODULE$ = this;
        AQEShuffleReadRule.$init$(this);
        this.supportedShuffleOrigins = (Seq)new .colon.colon((Object)REBALANCE_PARTITIONS_BY_NONE$.MODULE$, (List)new .colon.colon((Object)REBALANCE_PARTITIONS_BY_COL$.MODULE$, (List)Nil$.MODULE$));
    }
}

