package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import scala.math.Ordering;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.class */
public class ReduceByConverter implements RDDConverter<Tuple, Tuple, POReduceBySpark> {
    private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
    private static final TupleFactory tf = TupleFactory.getInstance();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter$LocalRearrangeFunction.class */
    public static class LocalRearrangeFunction extends AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
        private final POLocalRearrange lra;
        private boolean useSecondaryKey;
        private boolean[] secondarySortOrder;

        public LocalRearrangeFunction(POLocalRearrange pOLocalRearrange, boolean z, boolean[] zArr) {
            if (z) {
                this.useSecondaryKey = z;
                this.secondarySortOrder = zArr;
            }
            this.lra = pOLocalRearrange;
        }

        public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
            if (ReduceByConverter.LOG.isDebugEnabled()) {
                ReduceByConverter.LOG.debug("LocalRearrangeFunction in " + tuple);
            }
            try {
                this.lra.setInputs(null);
                this.lra.attachInput(tuple);
                Result nextTuple = this.lra.getNextTuple();
                if (nextTuple == null) {
                    throw new RuntimeException("Null response found for LocalRearange on tuple: " + tuple);
                }
                switch (nextTuple.returnStatus) {
                    case 0:
                        Tuple tuple2 = (Tuple) nextTuple.result;
                        Object obj = tuple2.get(1);
                        IndexedKey indexedKey = new IndexedKey(((Byte) tuple2.get(0)).byteValue(), obj);
                        if (this.useSecondaryKey) {
                            indexedKey.setUseSecondaryKey(this.useSecondaryKey);
                            indexedKey.setSecondarySortOrder(this.secondarySortOrder);
                        }
                        Tuple newTuple = TupleFactory.getInstance().newTuple();
                        newTuple.append(obj);
                        newTuple.append(tuple2.get(2));
                        Tuple2<IndexedKey, Tuple> tuple22 = new Tuple2<>(indexedKey, newTuple);
                        if (ReduceByConverter.LOG.isDebugEnabled()) {
                            ReduceByConverter.LOG.debug("LocalRearrangeFunction out " + tuple22);
                        }
                        return tuple22;
                    default:
                        throw new RuntimeException("Unexpected response code from operator " + this.lra + " : " + nextTuple);
                }
            } catch (ExecException e) {
                throw new RuntimeException("Couldn't do LocalRearange on tuple: " + tuple, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter$MergeValuesFunction.class */
    public static final class MergeValuesFunction extends AbstractFunction2<Tuple, Tuple, Tuple> implements Serializable {
        private final POReduceBySpark poReduce;

        public MergeValuesFunction(POReduceBySpark pOReduceBySpark) {
            this.poReduce = pOReduceBySpark;
        }

        public Tuple apply(Tuple tuple, Tuple tuple2) {
            ReduceByConverter.LOG.debug("MergeValuesFunction in : " + tuple + " , " + tuple2);
            Tuple newTuple = ReduceByConverter.tf.newTuple(2);
            DataBag newDefaultBag = DefaultBagFactory.getInstance().newDefaultBag();
            DefaultTuple defaultTuple = new DefaultTuple();
            try {
                Object obj = tuple.get(0);
                if (obj == null) {
                    obj = "";
                } else {
                    newTuple.set(0, obj);
                }
                newDefaultBag.add((Tuple) tuple.get(1));
                newDefaultBag.add((Tuple) tuple2.get(1));
                defaultTuple.append(obj);
                defaultTuple.append(newDefaultBag);
                this.poReduce.getPKGOp().getPkgr().attachInput(obj, new DataBag[]{(DataBag) defaultTuple.get(1)}, new boolean[]{true});
                Tuple tuple3 = (Tuple) this.poReduce.getPKGOp().getPkgr().getNext().result;
                ReduceByConverter.LOG.debug("MergeValuesFunction packagedTuple : " + defaultTuple);
                this.poReduce.attachInput(tuple3);
                Result next = this.poReduce.getNext(this.poReduce.getResultType());
                Tuple newTuple2 = ReduceByConverter.tf.newTuple();
                for (Object obj2 : ((Tuple) next.result).getAll()) {
                    if (!obj2.equals(obj)) {
                        newTuple2.append(obj2);
                    }
                }
                newTuple.set(1, newTuple2);
                ReduceByConverter.LOG.debug("MergeValuesFunction out : " + newTuple);
                return newTuple;
            } catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter$ToKeyNullValueFunction.class */
    private static class ToKeyNullValueFunction extends AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements Serializable {
        private ToKeyNullValueFunction() {
        }

        public Tuple2<Tuple, Object> apply(Tuple tuple) {
            if (ReduceByConverter.LOG.isDebugEnabled()) {
                ReduceByConverter.LOG.debug("ToKeyNullValueFunction in " + tuple);
            }
            Tuple2<Tuple, Object> tuple2 = new Tuple2<>(tuple, (Object) null);
            if (ReduceByConverter.LOG.isDebugEnabled()) {
                ReduceByConverter.LOG.debug("ToKeyNullValueFunction out " + tuple2);
            }
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter$ToKeyValueFunction.class */
    private static class ToKeyValueFunction implements Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
        private POReduceBySpark poReduce;

        public ToKeyValueFunction(POReduceBySpark pOReduceBySpark) {
            this.poReduce = null;
            this.poReduce = pOReduceBySpark;
        }

        public Tuple2<IndexedKey, Tuple> call(Tuple tuple) {
            try {
                if (ReduceByConverter.LOG.isDebugEnabled()) {
                    ReduceByConverter.LOG.debug("ToKeyValueFunction in " + tuple);
                }
                Object obj = (this.poReduce == null || !this.poReduce.isUseSecondaryKey()) ? tuple.get(1) : ((Tuple) tuple.get(1)).get(0);
                Tuple newTuple = ReduceByConverter.tf.newTuple();
                newTuple.append(obj);
                newTuple.append(tuple.get(2));
                Tuple2<IndexedKey, Tuple> tuple2 = new Tuple2<>(new IndexedKey(((Byte) tuple.get(0)).byteValue(), obj), newTuple);
                if (ReduceByConverter.LOG.isDebugEnabled()) {
                    ReduceByConverter.LOG.debug("ToKeyValueFunction out " + tuple2);
                }
                return tuple2;
            } catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter$ToTupleFunction.class */
    public static final class ToTupleFunction extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple> implements Serializable {
        private final POReduceBySpark poReduce;

        public ToTupleFunction(POReduceBySpark pOReduceBySpark) {
            this.poReduce = pOReduceBySpark;
        }

        public Tuple apply(Tuple2<IndexedKey, Tuple> tuple2) {
            ReduceByConverter.LOG.debug("ToTupleFunction in : " + tuple2);
            DataBag newDefaultBag = DefaultBagFactory.getInstance().newDefaultBag();
            DefaultTuple defaultTuple = new DefaultTuple();
            try {
                Object obj = ((Tuple) tuple2._2()).get(0);
                newDefaultBag.add((Tuple) ((Tuple) tuple2._2()).get(1));
                defaultTuple.append(obj);
                defaultTuple.append(newDefaultBag);
                this.poReduce.getPKGOp().getPkgr().attachInput(obj, new DataBag[]{(DataBag) defaultTuple.get(1)}, new boolean[]{true});
                Tuple tuple = (Tuple) this.poReduce.getPKGOp().getPkgr().getNext().result;
                ReduceByConverter.LOG.debug("ToTupleFunction out : " + tuple);
                return tuple;
            } catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter
    public RDD<Tuple> convert(List<RDD<Tuple>> list, POReduceBySpark pOReduceBySpark) throws IOException {
        SparkUtil.assertPredecessorSize(list, pOReduceBySpark, 1);
        SparkPigContext.get();
        int parallelism = SparkPigContext.getParallelism(list, pOReduceBySpark);
        RDD map = list.get(0).map(new LocalRearrangeFunction(pOReduceBySpark.getLROp(), pOReduceBySpark.isUseSecondaryKey(), pOReduceBySpark.getSecondarySortOrder()), SparkUtil.getTuple2Manifest());
        if (pOReduceBySpark.isUseSecondaryKey()) {
            return SecondaryKeySortUtil.handleSecondarySort(map, pOReduceBySpark.getPKGOp());
        }
        RDD reduceByKey = new PairRDDFunctions(map, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class), (Ordering) null).reduceByKey(SparkUtil.getPartitioner(pOReduceBySpark.getCustomPartitioner(), parallelism), new MergeValuesFunction(pOReduceBySpark));
        LOG.debug("Custom Partitioner and parallelims used : " + pOReduceBySpark.getCustomPartitioner() + ", " + parallelism);
        return reduceByKey.map(new ToTupleFunction(pOReduceBySpark), SparkUtil.getManifest(Tuple.class));
    }

    private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(RDD<Tuple> rdd, POReduceBySpark pOReduceBySpark, int i) {
        return new JavaPairRDD(rdd.map(new ToKeyNullValueFunction(), SparkUtil.getTuple2Manifest()), SparkUtil.getManifest(Tuple.class), SparkUtil.getManifest(Object.class)).repartitionAndSortWithinPartitions(new HashPartitioner(i), new PigSecondaryKeyComparatorSpark(pOReduceBySpark.getSecondarySortOrder())).keys().map(new ToKeyValueFunction(pOReduceBySpark));
    }
}
