package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import scala.math.Ordering;

/* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.class */
public class StoreConverter implements RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
    private static final Log LOG = LogFactory.getLog(StoreConverter.class);
    private JobConf jobConf;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter$FromTupleFunction.class */
    public static class FromTupleFunction implements Function<Tuple, Tuple2<Text, Tuple>> {
        private static Text EMPTY_TEXT = new Text();
        private String counterGroupName;
        private String counterName;
        private SparkCounters sparkCounters;
        private boolean disableCounter;

        private FromTupleFunction() {
        }

        public Tuple2<Text, Tuple> call(Tuple tuple) {
            if (this.sparkCounters != null && !this.disableCounter) {
                this.sparkCounters.increment(this.counterGroupName, this.counterName, 1L);
            }
            return new Tuple2<>(EMPTY_TEXT, tuple);
        }

        public void setCounterGroupName(String str) {
            this.counterGroupName = str;
        }

        public void setCounterName(String str) {
            this.counterName = str;
        }

        public void setSparkCounters(SparkCounters sparkCounters) {
            this.sparkCounters = sparkCounters;
        }

        public void setDisableCounter(boolean z) {
            this.disableCounter = z;
        }
    }

    public StoreConverter(JobConf jobConf) {
        this.jobConf = null;
        this.jobConf = jobConf;
    }

    @Override // org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter
    public RDD<Tuple2<Text, Tuple>> convert(List<RDD<Tuple>> list, POStore pOStore) throws IOException {
        SparkUtil.assertPredecessorSize(list, pOStore, 1);
        RDD<Tuple> rdd = list.get(0);
        SparkPigStatusReporter.getInstance().createCounter("MultiStoreCounters", SparkStatsUtil.getCounterName(pOStore));
        JavaRDD map = rdd.toJavaRDD().map(buildFromTupleFunction(pOStore));
        PairRDDFunctions pairRDDFunctions = new PairRDDFunctions(map.rdd(), SparkUtil.getManifest(Text.class), SparkUtil.getManifest(Tuple.class), (Ordering) null);
        POStore configureStorer = configureStorer(this.jobConf, pOStore);
        if (PigConfiguration.PIG_STORE_SCHEMA_DISAMBIGUATE_DEFAULT.equalsIgnoreCase(this.jobConf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
            Job job = new Job(this.jobConf);
            LazyOutputFormat.setOutputFormatClass(job, PigOutputFormat.class);
            this.jobConf = job.getConfiguration();
            this.jobConf.setOutputKeyClass(Text.class);
            this.jobConf.setOutputValueClass(Tuple.class);
            FileOutputFormat.setOutputPath(this.jobConf, new Path(configureStorer.getSFile().getFileName()));
            pairRDDFunctions.saveAsNewAPIHadoopDataset(this.jobConf);
        } else {
            pairRDDFunctions.saveAsNewAPIHadoopFile(configureStorer.getSFile().getFileName(), Text.class, Tuple.class, PigOutputFormat.class, this.jobConf);
        }
        RDD<Tuple2<Text, Tuple>> rdd2 = map.rdd();
        if (LOG.isDebugEnabled()) {
            LOG.debug("RDD lineage: " + rdd2.toDebugString());
        }
        return rdd2;
    }

    private static POStore configureStorer(JobConf jobConf, PhysicalOperator physicalOperator) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        POStore pOStore = (POStore) physicalOperator;
        newArrayList.add(pOStore);
        pOStore.getStoreFunc().setStoreLocation(pOStore.getSFile().getFileName(), new Job(jobConf));
        pOStore.setInputs(null);
        pOStore.setParentPlan(null);
        jobConf.set(JobControlCompiler.PIG_MAP_STORES, ObjectSerializer.serialize(Lists.newArrayList()));
        jobConf.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(newArrayList));
        return pOStore;
    }

    private FromTupleFunction buildFromTupleFunction(POStore pOStore) {
        FromTupleFunction fromTupleFunction = new FromTupleFunction();
        boolean disableCounter = pOStore.disableCounter();
        if (!pOStore.isTmpStore() && !disableCounter) {
            fromTupleFunction.setDisableCounter(disableCounter);
            fromTupleFunction.setCounterGroupName("MultiStoreCounters");
            fromTupleFunction.setCounterName(SparkStatsUtil.getCounterName(pOStore));
            fromTupleFunction.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
        }
        return fromTupleFunction;
    }
}
