/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.io.api.impl;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hive.common.util.HiveStringUtils;

public class LlapInputFormat
implements InputFormat<NullWritable, VectorizedRowBatch>,
VectorizedInputFormatInterface,
SelfDescribingInputFormatInterface,
CombineHiveInputFormat.AvoidSplitCombination {
    private final InputFormat sourceInputFormat;
    private final CombineHiveInputFormat.AvoidSplitCombination sourceASC;
    private final ColumnVectorProducer cvp;
    private final ListeningExecutorService executor;
    private final String hostName;

    LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp, ListeningExecutorService executor) {
        assert (sourceInputFormat instanceof OrcInputFormat);
        this.executor = executor;
        this.cvp = cvp;
        this.sourceInputFormat = sourceInputFormat;
        this.sourceASC = sourceInputFormat instanceof CombineHiveInputFormat.AvoidSplitCombination ? (CombineHiveInputFormat.AvoidSplitCombination)sourceInputFormat : null;
        this.hostName = HiveStringUtils.getHostname();
    }

    public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        boolean useLlapIo = true;
        if (split instanceof LlapAwareSplit) {
            useLlapIo = ((LlapAwareSplit)split).canUseLlapIo();
        }
        if (!useLlapIo) {
            LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
            RecordReader rr = this.sourceInputFormat.getRecordReader(split, job, reporter);
            return rr;
        }
        boolean isVectorMode = Utilities.isVectorMode((Configuration)job);
        if (!isVectorMode) {
            LlapIoImpl.LOG.error("No LLAP IO in non-vectorized mode");
            throw new UnsupportedOperationException("No LLAP IO in non-vectorized mode");
        }
        FileSplit fileSplit = (FileSplit)split;
        reporter.setStatus(fileSplit.toString());
        try {
            List includedCols = ColumnProjectionUtils.isReadAllColumns((Configuration)job) ? null : ColumnProjectionUtils.getReadColumnIDs((Configuration)job);
            return new LlapRecordReader(job, fileSplit, includedCols, this.hostName);
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        return this.sourceInputFormat.getSplits(job, numSplits);
    }

    public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
        return this.sourceASC == null ? false : this.sourceASC.shouldSkipCombine(path, conf);
    }

    private class LlapRecordReader
    implements RecordReader<NullWritable, VectorizedRowBatch>,
    Consumer<ColumnVectorBatch> {
        private final FileSplit split;
        private final List<Integer> columnIds;
        private final SearchArgument sarg;
        private final String[] columnNames;
        private final VectorizedRowBatchCtx rbCtx;
        private final boolean[] columnsToIncludeTruncated;
        private final Object[] partitionValues;
        private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList();
        private ColumnVectorBatch lastCvb = null;
        private boolean isFirst = true;
        private Throwable pendingError = null;
        private boolean isDone = false;
        private final boolean isClosed = false;
        private ConsumerFeedback<ColumnVectorBatch> feedback;
        private final QueryFragmentCounters counters;
        private long firstReturnTime;

        public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols, String hostName) throws IOException {
            this.split = split;
            this.columnIds = includedCols;
            this.sarg = ConvertAstToSearchArg.createFromConf((Configuration)job);
            this.columnNames = ColumnProjectionUtils.getReadColumnNames((Configuration)job);
            this.counters = new QueryFragmentCounters((Configuration)job);
            this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
            MapWork mapWork = Utilities.getMapWork((Configuration)job);
            this.rbCtx = mapWork.getVectorizedRowBatchCtx();
            this.columnsToIncludeTruncated = this.rbCtx.getColumnsToIncludeTruncated((Configuration)job);
            int partitionColumnCount = this.rbCtx.getPartitionColumnCount();
            if (partitionColumnCount > 0) {
                this.partitionValues = new Object[partitionColumnCount];
                VectorizedRowBatchCtx.getPartitionValues((VectorizedRowBatchCtx)this.rbCtx, (Configuration)job, (FileSplit)split, (Object[])this.partitionValues);
            } else {
                this.partitionValues = null;
            }
            this.startRead();
        }

        public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
            assert (value != null);
            boolean wasFirst = this.isFirst;
            if (this.isFirst) {
                if (this.partitionValues != null) {
                    this.rbCtx.addPartitionColsToBatch(value, this.partitionValues);
                }
                this.isFirst = false;
            }
            ColumnVectorBatch cvb = null;
            try {
                cvb = this.nextCvb();
            }
            catch (InterruptedException e) {
                this.feedback.stop();
                throw new IOException(e);
            }
            if (cvb == null) {
                if (wasFirst) {
                    this.firstReturnTime = this.counters.startTimeCounter();
                }
                this.counters.incrTimeCounter(QueryFragmentCounters.Counter.CONSUMER_TIME_US, this.firstReturnTime);
                return false;
            }
            if (this.columnIds.size() != cvb.cols.length) {
                throw new RuntimeException("Unexpected number of columns, VRB has " + this.columnIds.size() + " included, but the reader returned " + cvb.cols.length);
            }
            for (int i = 0; i < cvb.cols.length; ++i) {
                cvb.swapColumnVector(i, value.cols, this.columnIds.get(i));
            }
            value.selectedInUse = false;
            value.size = cvb.size;
            if (wasFirst) {
                this.firstReturnTime = this.counters.startTimeCounter();
            }
            return true;
        }

        private void startRead() {
            ReadPipeline rp;
            this.feedback = rp = LlapInputFormat.this.cvp.createReadPipeline(this, this.split, this.columnIds, this.sarg, this.columnNames, this.counters);
            ListenableFuture future = LlapInputFormat.this.executor.submit(rp.getReadCallable());
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new UncaughtErrorHandler());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
            boolean isFirst;
            boolean bl = isFirst = this.lastCvb == null;
            if (!isFirst) {
                this.feedback.returnData(this.lastCvb);
            }
            LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
            synchronized (linkedList) {
                boolean doLogBlocking;
                boolean bl2 = doLogBlocking = DebugUtils.isTraceMttEnabled() && this.isNothingToReport();
                if (doLogBlocking) {
                    LlapIoImpl.LOG.info("next will block");
                }
                while (this.isNothingToReport()) {
                    this.pendingData.wait(100L);
                }
                if (doLogBlocking) {
                    LlapIoImpl.LOG.info("next is unblocked");
                }
                this.rethrowErrorIfAny();
                this.lastCvb = this.pendingData.poll();
            }
            if (DebugUtils.isTraceMttEnabled() && this.lastCvb != null) {
                LlapIoImpl.LOG.info("Processing will receive vector " + this.lastCvb);
            }
            return this.lastCvb;
        }

        private boolean isNothingToReport() {
            return !this.isDone && this.pendingData.isEmpty() && this.pendingError == null;
        }

        public NullWritable createKey() {
            return NullWritable.get();
        }

        public VectorizedRowBatch createValue() {
            return this.rbCtx.createVectorizedRowBatch(this.columnsToIncludeTruncated);
        }

        public long getPos() throws IOException {
            return -1L;
        }

        public void close() throws IOException {
            if (DebugUtils.isTraceMttEnabled()) {
                LlapIoImpl.LOG.info("close called; closed false, done " + this.isDone + ", err " + this.pendingError + ", pending " + this.pendingData.size());
            }
            LlapIoImpl.LOG.info("Llap counters: {}", (Object)this.counters);
            this.feedback.stop();
            this.rethrowErrorIfAny();
        }

        private void rethrowErrorIfAny() throws IOException {
            if (this.pendingError == null) {
                return;
            }
            if (this.pendingError instanceof IOException) {
                throw (IOException)this.pendingError;
            }
            throw new IOException(this.pendingError);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setDone() {
            if (DebugUtils.isTraceMttEnabled()) {
                LlapIoImpl.LOG.info("setDone called; closed false, done " + this.isDone + ", err " + this.pendingError + ", pending " + this.pendingData.size());
            }
            LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
            synchronized (linkedList) {
                this.isDone = true;
                this.pendingData.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void consumeData(ColumnVectorBatch data) {
            if (DebugUtils.isTraceMttEnabled()) {
                LlapIoImpl.LOG.info("consume called; closed false, done " + this.isDone + ", err " + this.pendingError + ", pending " + this.pendingData.size());
            }
            LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
            synchronized (linkedList) {
                this.pendingData.add(data);
                this.pendingData.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void setError(Throwable t) {
            this.counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS);
            LlapIoImpl.LOG.info("setError called; closed false, done " + this.isDone + ", err " + this.pendingError + ", pending " + this.pendingData.size());
            assert (t != null);
            LinkedList<ColumnVectorBatch> linkedList = this.pendingData;
            synchronized (linkedList) {
                this.pendingError = t;
                this.pendingData.notifyAll();
            }
        }

        public float getProgress() throws IOException {
            return 0.0f;
        }

        private final class UncaughtErrorHandler
        implements FutureCallback<Void> {
            private UncaughtErrorHandler() {
            }

            public void onSuccess(Void result) {
            }

            public void onFailure(Throwable t) {
                LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage());
                LlapRecordReader.this.setError(t);
            }
        }
    }
}

