package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigConstants;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.class */
public class LoadConverter implements RDDConverter<Tuple, Tuple, POLoad> {
    private static Log LOG = LogFactory.getLog(LoadConverter.class);
    private PigContext pigContext;
    private PhysicalPlan physicalPlan;
    private SparkContext sparkContext;
    private JobConf jobConf;
    private SparkEngineConf sparkEngineConf;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter$ToTupleFunction.class */
    public static class ToTupleFunction extends AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
        private String counterGroupName;
        private String counterName;
        private SparkCounters sparkCounters;
        private boolean disableCounter;
        private SparkEngineConf sparkEngineConf;
        private boolean initialized;

        public ToTupleFunction(SparkEngineConf sparkEngineConf) {
            this.sparkEngineConf = sparkEngineConf;
        }

        public Tuple apply(Tuple2<Text, Tuple> tuple2) {
            if (!this.initialized) {
                long partitionId = TaskContext.get().partitionId();
                Configuration configuration = PigMapReduce.sJobConfInternal.get();
                configuration.set(PigConstants.TASK_INDEX, Long.toString(partitionId));
                configuration.set(MRConfiguration.TASK_ID, Long.toString(partitionId));
                this.initialized = true;
            }
            if (this.sparkCounters != null && !this.disableCounter) {
                this.sparkCounters.increment(this.counterGroupName, this.counterName, 1L);
            }
            return (Tuple) tuple2._2();
        }

        public void setCounterGroupName(String str) {
            this.counterGroupName = str;
        }

        public void setCounterName(String str) {
            this.counterName = str;
        }

        public void setSparkCounters(SparkCounters sparkCounters) {
            this.sparkCounters = sparkCounters;
        }

        public void setDisableCounter(boolean z) {
            this.disableCounter = z;
        }
    }

    public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan, SparkContext sparkContext, JobConf jobConf, SparkEngineConf sparkEngineConf) {
        this.pigContext = pigContext;
        this.physicalPlan = physicalPlan;
        this.sparkContext = sparkContext;
        this.jobConf = jobConf;
        this.sparkEngineConf = sparkEngineConf;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter
    public RDD<Tuple> convert(List<RDD<Tuple>> list, POLoad pOLoad) throws IOException {
        configureLoader(this.physicalPlan, pOLoad, this.jobConf);
        this.jobConf.set("mapreduce.input.fileinputformat.inputdir", pOLoad.getLFile().getFileName());
        if (hasMergeJoinSuccessor(pOLoad)) {
            this.jobConf.set(PigConfiguration.PIG_NO_SPLIT_COMBINATION, PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT);
        }
        UDFContext.getUDFContext().serialize(this.jobConf);
        RDD newAPIHadoopRDD = this.sparkContext.newAPIHadoopRDD(this.jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
        registerUdfFiles();
        ToTupleFunction toTupleFunction = new ToTupleFunction(this.sparkEngineConf);
        boolean z = this.jobConf.getBoolean("pig.disable.counter", false);
        if (!pOLoad.isTmpLoad() && !z) {
            String counterName = SparkStatsUtil.getCounterName(pOLoad);
            SparkPigStatusReporter sparkPigStatusReporter = SparkPigStatusReporter.getInstance();
            if (sparkPigStatusReporter.getCounters() != null) {
                sparkPigStatusReporter.getCounters().createCounter("MultiInputCounters", counterName);
            }
            toTupleFunction.setDisableCounter(z);
            toTupleFunction.setCounterGroupName("MultiInputCounters");
            toTupleFunction.setCounterName(counterName);
            toTupleFunction.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
        }
        return newAPIHadoopRDD.map(toTupleFunction, SparkUtil.getManifest(Tuple.class));
    }

    private void registerUdfFiles() throws MalformedURLException {
        Iterator<Map.Entry<String, File>> it = this.pigContext.getScriptFiles().entrySet().iterator();
        while (it.hasNext()) {
            File value = it.next().getValue();
            if (value.exists()) {
                this.sparkContext.addFile(value.toURI().toURL().toExternalForm());
            }
        }
    }

    private static JobConf configureLoader(PhysicalPlan physicalPlan, POLoad pOLoad, JobConf jobConf) throws IOException {
        pOLoad.getLoadFunc().setLocation(pOLoad.getLFile().getFileName(), new Job(jobConf));
        ArrayList arrayList = new ArrayList();
        arrayList.add(pOLoad.getLFile());
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList newArrayList3 = Lists.newArrayList();
        List<PhysicalOperator> successors = physicalPlan.getSuccessors(pOLoad);
        ArrayList newArrayList4 = Lists.newArrayList();
        if (successors != null) {
            Iterator<PhysicalOperator> it = successors.iterator();
            while (it.hasNext()) {
                newArrayList4.add(it.next().getOperatorKey());
            }
        }
        newArrayList.add(newArrayList4);
        newArrayList2.add(pOLoad.getSignature());
        newArrayList3.add(Long.valueOf(pOLoad.getLimit()));
        jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(arrayList));
        jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(newArrayList));
        jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(newArrayList2));
        jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(newArrayList3));
        return jobConf;
    }

    private static boolean hasMergeJoinSuccessor(PhysicalOperator physicalOperator) {
        List<PhysicalOperator> successors;
        if (physicalOperator == null || physicalOperator.getParentPlan() == null || (successors = physicalOperator.getParentPlan().getSuccessors(physicalOperator)) == null) {
            return false;
        }
        for (PhysicalOperator physicalOperator2 : successors) {
            if ((physicalOperator2 instanceof POMergeJoin) || hasMergeJoinSuccessor(physicalOperator2)) {
                return true;
            }
        }
        return false;
    }
}
