/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.io.api.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.org.apache.commons.lang3.StringUtils;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.apache.hive.org.slf4j.MDC;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.tez.common.counters.TezCounters;

class LlapRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch>,
Consumer<ColumnVectorBatch> {
    private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
    private final FileSplit split;
    private List<Integer> columnIds;
    private final SearchArgument sarg;
    private final String[] columnNames;
    private final VectorizedRowBatchCtx rbCtx;
    private final Object[] partitionValues;
    private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList();
    private ColumnVectorBatch lastCvb = null;
    private boolean isFirst = true;
    private Throwable pendingError = null;
    private boolean isDone = false;
    private final boolean isClosed = false;
    private final ConsumerFeedback<ColumnVectorBatch> feedback;
    private final QueryFragmentCounters counters;
    private long firstReturnTime;
    private final JobConf jobConf;
    private final ReadPipeline rp;
    private final ExecutorService executor;
    private final int columnCount;
    private final boolean isAcidScan;

    public static LlapRecordReader create(JobConf job, FileSplit split, List<Integer> includedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter) throws IOException, HiveException {
        MapWork mapWork = LlapRecordReader.findMapWork(job);
        if (mapWork == null) {
            return null;
        }
        LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, includedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter);
        if (!rr.checkOrcSchemaEvolution()) {
            rr.close();
            return null;
        }
        return rr;
    }

    private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, List<Integer> includedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe, Reporter reporter) throws IOException, HiveException {
        int partitionColumnCount;
        this.executor = executor;
        this.jobConf = job;
        this.split = split;
        this.sarg = ConvertAstToSearchArg.createFromConf((Configuration)job);
        this.columnNames = ColumnProjectionUtils.getReadColumnNames((Configuration)job);
        String fragmentId = LlapTezUtils.getFragmentId(job);
        String dagId = LlapTezUtils.getDagId(job);
        String queryId = HiveConf.getVar((Configuration)job, HiveConf.ConfVars.HIVEQUERYID);
        MDC.put("dagId", dagId);
        MDC.put("queryId", queryId);
        TezCounters taskCounters = null;
        if (fragmentId != null) {
            MDC.put("fragmentId", fragmentId);
            taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
            LOG.info("Received fragment id: {}", (Object)fragmentId);
        } else {
            LOG.warn("Not using tez counters as fragment id string is null");
        }
        this.counters = new QueryFragmentCounters((Configuration)job, taskCounters);
        this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
        this.isAcidScan = HiveConf.getBoolVar((Configuration)this.jobConf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
        TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr((Configuration)job, (boolean)this.isAcidScan, (int)Integer.MAX_VALUE);
        this.columnIds = includedCols;
        this.columnCount = this.columnIds.size();
        VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
        VectorizedRowBatchCtx vectorizedRowBatchCtx = this.rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork);
        if (includedCols == null) {
            includedCols = new ArrayList<Integer>(this.rbCtx.getRowColumnTypeInfos().length);
            for (int i = 0; i < this.rbCtx.getRowColumnTypeInfos().length; ++i) {
                includedCols.add(i);
            }
        }
        if ((partitionColumnCount = this.rbCtx.getPartitionColumnCount()) > 0) {
            this.partitionValues = new Object[partitionColumnCount];
            VectorizedRowBatchCtx.getPartitionValues((VectorizedRowBatchCtx)this.rbCtx, (MapWork)mapWork, (FileSplit)split, (Object[])this.partitionValues);
        } else {
            this.partitionValues = null;
        }
        this.feedback = this.rp = cvp.createReadPipeline(this, split, this.columnIds, this.sarg, this.columnNames, this.counters, schema, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo());
    }

    private static MapWork findMapWork(JobConf job) throws HiveException {
        String prefixes;
        String inputName = job.get("iocontext.input.name", null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing for input " + inputName);
        }
        if ((prefixes = job.get("hive.tez.merge.file.prefixes")) != null && !StringUtils.isBlank(prefixes)) {
            return null;
        }
        MapWork work = null;
        if (work == null || !(work instanceof MapWork)) {
            work = Utilities.getMapWork((Configuration)job);
        }
        return work;
    }

    public void start() {
        if (this.executor instanceof StatsRecordingThreadPool) {
            ((StatsRecordingThreadPool)this.executor).setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
        }
        this.executor.submit(this.rp.getReadCallable());
    }

    private boolean checkOrcSchemaEvolution() {
        SchemaEvolution evolution = this.rp.getSchemaEvolution();
        for (int i = 0; i < this.columnCount; ++i) {
            int projectedColId = this.columnIds == null ? i : this.columnIds.get(i);
            int fileColId = OrcInputFormat.getRootColumn((!this.isAcidScan ? 1 : 0) != 0) + projectedColId + 1;
            if (evolution.isPPDSafeConversion(fileColId)) continue;
            LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", (Object)this.split);
            return false;
        }
        return true;
    }

    public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
        assert (value != null);
        boolean wasFirst = this.isFirst;
        if (this.isFirst) {
            if (this.partitionValues != null) {
                this.rbCtx.addPartitionColsToBatch(value, this.partitionValues);
            }
            this.isFirst = false;
        }
        ColumnVectorBatch cvb = null;
        try {
            cvb = this.nextCvb();
        }
        catch (InterruptedException e) {
            this.feedback.stop();
            throw new IOException(e);
        }
        if (cvb == null) {
            if (wasFirst) {
                this.firstReturnTime = this.counters.startTimeCounter();
            }
            this.counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, this.firstReturnTime);
            return false;
        }
        if (this.columnCount != cvb.cols.length) {
            throw new RuntimeException("Unexpected number of columns, VRB has " + this.columnCount + " included, but the reader returned " + cvb.cols.length);
        }
        for (int i = 0; i < cvb.cols.length; ++i) {
            cvb.swapColumnVector(i, value.cols, this.columnIds.get(i));
        }
        value.selectedInUse = false;
        value.size = cvb.size;
        if (wasFirst) {
            this.firstReturnTime = this.counters.startTimeCounter();
        }
        return true;
    }

    public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
        return this.rbCtx;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
        boolean isFirst;
        boolean bl = isFirst = this.lastCvb == null;
        if (!isFirst) {
            this.feedback.returnData(this.lastCvb);
        }
        LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
        synchronized (linkedList) {
            boolean doLogBlocking;
            boolean bl2 = doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && this.isNothingToReport();
            if (doLogBlocking) {
                LlapIoImpl.LOG.trace("next will block");
            }
            while (this.isNothingToReport()) {
                this.pendingData.wait(100L);
            }
            if (doLogBlocking) {
                LlapIoImpl.LOG.trace("next is unblocked");
            }
            this.rethrowErrorIfAny();
            this.lastCvb = this.pendingData.poll();
        }
        if (LlapIoImpl.LOG.isTraceEnabled() && this.lastCvb != null) {
            LlapIoImpl.LOG.trace("Processing will receive vector {}", (Object)this.lastCvb);
        }
        return this.lastCvb;
    }

    private boolean isNothingToReport() {
        return !this.isDone && this.pendingData.isEmpty() && this.pendingError == null;
    }

    public NullWritable createKey() {
        return NullWritable.get();
    }

    public VectorizedRowBatch createValue() {
        return this.rbCtx.createVectorizedRowBatch();
    }

    public long getPos() throws IOException {
        return -1L;
    }

    public void close() throws IOException {
        if (LlapIoImpl.LOG.isTraceEnabled()) {
            LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}", false, this.isDone, this.pendingError, this.pendingData.size());
        }
        LlapIoImpl.LOG.info("Llap counters: {}", (Object)this.counters);
        this.feedback.stop();
        this.rethrowErrorIfAny();
        MDC.clear();
    }

    private void rethrowErrorIfAny() throws IOException {
        if (this.pendingError == null) {
            return;
        }
        if (this.pendingError instanceof IOException) {
            throw (IOException)this.pendingError;
        }
        throw new IOException(this.pendingError);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setDone() {
        if (LlapIoImpl.LOG.isDebugEnabled()) {
            LlapIoImpl.LOG.debug("setDone called; closed {}, done {}, err {}, pending {}", false, this.isDone, this.pendingError, this.pendingData.size());
        }
        LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
        synchronized (linkedList) {
            this.isDone = true;
            this.pendingData.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consumeData(ColumnVectorBatch data) {
        if (LlapIoImpl.LOG.isTraceEnabled()) {
            LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}", false, this.isDone, this.pendingError, this.pendingData.size());
        }
        LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
        synchronized (linkedList) {
            this.pendingData.add(data);
            this.pendingData.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setError(Throwable t) {
        this.counters.incrCounter(LlapIOCounters.NUM_ERRORS);
        LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}", false, this.isDone, this.pendingError, this.pendingData.size());
        assert (t != null);
        LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
        synchronized (linkedList) {
            this.pendingError = t;
            this.pendingData.notifyAll();
        }
    }

    public float getProgress() throws IOException {
        return 0.0f;
    }

    private final class IOUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private IOUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {} Message: {}", t.getName(), t.getId(), e.getMessage());
            LlapRecordReader.this.setError(e);
        }
    }
}

