/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.spark.SparkRecordHandler;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMapRecordHandler
extends SparkRecordHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SparkMapRecordHandler.class);
    private MapOperator mo;
    private MapredLocalWork localWork = null;
    private boolean isLogInfoEnabled = false;
    private ExecMapperContext execContext;

    @Override
    public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
        this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkInitializeOperators");
        super.init(job, output, reporter);
        this.isLogInfoEnabled = LOG.isInfoEnabled();
        try {
            this.jc = job;
            this.execContext = new ExecMapperContext(this.jc);
            MapWork mrwork = Utilities.getMapWork((Configuration)job);
            CompilationOpContext runtimeCtx = new CompilationOpContext();
            this.mo = mrwork.getVectorMode() ? new VectorMapOperator(runtimeCtx) : new MapOperator(runtimeCtx);
            this.mo.setConf(mrwork);
            this.mo.initialize((Configuration)this.jc, null);
            this.mo.setChildren((Configuration)job);
            LOG.info(this.mo.dump(0));
            this.localWork = mrwork.getMapRedLocalWork();
            this.execContext.setLocalWork(this.localWork);
            MapredContext.init(true, new JobConf((Configuration)this.jc));
            MapredContext.get().setReporter(reporter);
            this.mo.passExecContext(this.execContext);
            this.mo.initializeLocalWork((Configuration)this.jc);
            this.mo.initializeMapOperator((Configuration)this.jc);
            OperatorUtils.setChildrenCollector(this.mo.getChildOperators(), output);
            this.mo.setReporter(this.rp);
            if (this.localWork == null) {
                return;
            }
            LOG.info("Initializing dummy operator");
            List<Operator<? extends OperatorDesc>> dummyOps = this.localWork.getDummyParentOp();
            for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
                dummyOp.setExecContext(this.execContext);
                dummyOp.initialize((Configuration)this.jc, null);
            }
        }
        catch (Throwable e) {
            this.abort = true;
            if (e instanceof OutOfMemoryError) {
                throw (OutOfMemoryError)e;
            }
            throw new RuntimeException("Map operator initialization failed: " + e, e);
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkInitializeOperators");
    }

    @Override
    public void processRow(Object key, Object value) throws IOException {
        this.execContext.resetRow();
        try {
            this.mo.process((Writable)value);
            if (this.isLogInfoEnabled) {
                this.logMemoryInfo();
            }
        }
        catch (Throwable e) {
            this.abort = true;
            Utilities.setMapWork((Configuration)this.jc, null);
            if (e instanceof OutOfMemoryError) {
                throw (OutOfMemoryError)e;
            }
            String msg = "Error processing row: " + e;
            LOG.error(msg, e);
            throw new RuntimeException(msg, e);
        }
    }

    @Override
    public <E> void processRow(Object key, Iterator<E> values) throws IOException {
        throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.oc == null) {
            LOG.trace("Close called. no row processed by map.");
        }
        if (!this.abort) {
            this.abort = this.execContext.getIoCxt().getIOExceptions();
        }
        try {
            this.mo.close(this.abort);
            if (this.localWork != null) {
                List<Operator<? extends OperatorDesc>> dummyOps = this.localWork.getDummyParentOp();
                for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
                    dummyOp.close(this.abort);
                }
            }
            if (this.isLogInfoEnabled) {
                this.logCloseInfo();
            }
            ExecMapper.ReportStats rps = new ExecMapper.ReportStats(this.rp, (Configuration)this.jc);
            this.mo.preorderMap(rps);
            return;
        }
        catch (Exception e) {
            if (!this.abort) {
                String msg = "Hit error while closing operators - failing tree: " + e;
                LOG.error(msg, (Throwable)e);
                throw new IllegalStateException(msg, e);
            }
        }
        finally {
            MapredContext.close();
            Utilities.clearWorkMap((Configuration)this.jc);
        }
    }

    @Override
    public boolean getDone() {
        return this.mo.getDone();
    }
}

