package org.apache.drill.exec.physical.impl.xsort.managed;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup;
import org.apache.drill.exec.physical.impl.xsort.managed.CopierHolder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.class */
public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    private static final Logger logger;
    protected static final ControlsInjector injector;
    private static final int MIN_MERGED_BATCH_SIZE = 262144;
    private static final float MERGE_BATCH_ALLOWANCE = 0.1f;
    private static final long MIN_SPILL_FILE_SIZE = 1048576;
    public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    public static final String INTERRUPTION_WHILE_MERGING = "merging";
    public static final long DEFAULT_SPILL_BATCH_SIZE = 8388608;
    public static final long MIN_SPILL_BATCH_SIZE = 262144;
    private final RecordBatch incoming;
    private final BufferAllocator allocator;
    private BatchSchema schema;
    private LinkedList<BatchGroup.InputBatch> bufferedBatches;
    private LinkedList<BatchGroup.SpilledRun> spilledRuns;
    private SelectionVector4 sv4;
    private int mergeBatchRowCount;
    private int peakNumBatches;
    private long memoryLimit;
    private SortResults resultsIterator;
    private final SpillSet spillSet;
    private final CopierHolder copierHolder;
    private SortState sortState;
    private int inputRecordCount;
    private int inputBatchCount;
    private final OperatorCodeGenerator opCodeGen;
    private int estimatedRowWidth;
    private long targetMergeBatchSize;
    private long estimatedInputBatchSize;
    private int mergeLimit;
    private long spillFileSize;
    private long minimumBufferSpace;
    private long bufferMemoryPool;
    private long mergeMemoryPool;
    private long preferredMergeBatchSize;
    private long totalInputBytes;
    private Long preferredSpillBatchSize;
    private int maxDensity;
    private int lastDensity;
    private int spillBatchRowCount;
    private int targetSpillBatchSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch$Metric.class */
    public enum Metric implements MetricDef {
        SPILL_COUNT,
        RETIRED1,
        PEAK_BATCHES_IN_MEMORY,
        MERGE_COUNT,
        MIN_BUFFER,
        SPILL_MB;

        @Override // org.apache.drill.exec.ops.MetricDef
        public int metricId() {
            return ordinal();
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch$SortResults.class */
    public interface SortResults {
        boolean next();

        void close();

        int getBatchCount();

        int getRecordCount();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch$SortState.class */
    public enum SortState {
        START,
        LOAD,
        DELIVER,
        DONE
    }

    public ExternalSortBatch(ExternalSort externalSort, FragmentContext fragmentContext, RecordBatch recordBatch) {
        super(externalSort, fragmentContext, true);
        this.bufferedBatches = Lists.newLinkedList();
        this.spilledRuns = Lists.newLinkedList();
        this.peakNumBatches = -1;
        this.sortState = SortState.START;
        this.inputRecordCount = 0;
        this.inputBatchCount = 0;
        this.lastDensity = -1;
        this.incoming = recordBatch;
        this.allocator = this.oContext.getAllocator();
        this.opCodeGen = new OperatorCodeGenerator(fragmentContext, externalSort);
        this.spillSet = new SpillSet(fragmentContext, externalSort, "sort", "run");
        this.copierHolder = new CopierHolder(fragmentContext, this.allocator, this.opCodeGen);
        configure(fragmentContext.getConfig());
    }

    private void configure(DrillConfig drillConfig) {
        this.memoryLimit = this.allocator.getLimit();
        long longValue = drillConfig.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY).longValue();
        if (longValue > 0) {
            this.memoryLimit = Math.min(this.memoryLimit, longValue);
        }
        this.mergeLimit = getConfigLimit(drillConfig, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, PlannerSettings.MAX_BROADCAST_THRESHOLD, 2);
        this.spillFileSize = drillConfig.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE).longValue();
        this.spillFileSize = Math.max(this.spillFileSize, 1048576L);
        this.preferredSpillBatchSize = drillConfig.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
        this.preferredSpillBatchSize = Long.valueOf(Math.min(this.preferredSpillBatchSize.longValue(), this.memoryLimit / 4));
        this.preferredSpillBatchSize = Long.valueOf(Math.max(this.preferredSpillBatchSize.longValue(), MIN_SPILL_BATCH_SIZE));
        this.preferredMergeBatchSize = drillConfig.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE).longValue();
        this.preferredMergeBatchSize = Math.min(this.memoryLimit - (2 * this.preferredSpillBatchSize.longValue()), this.preferredMergeBatchSize);
        this.preferredMergeBatchSize = Math.max(this.preferredMergeBatchSize, MIN_SPILL_BATCH_SIZE);
        logger.debug("Config: memory limit = {}, spill file size = {}, spill batch size = {}, merge limit = {}, merge batch size = {}", new Object[]{Long.valueOf(this.memoryLimit), Long.valueOf(this.spillFileSize), this.preferredSpillBatchSize, Integer.valueOf(this.mergeLimit), Long.valueOf(this.preferredMergeBatchSize)});
    }

    private int getConfigLimit(DrillConfig drillConfig, String str, int i, int i2) {
        int i3 = drillConfig.getInt(str);
        return i3 > 0 ? Math.max(i3, i2) : i;
    }

    @Override // org.apache.drill.exec.record.RecordBatch, org.apache.drill.exec.record.VectorAccessible
    public int getRecordCount() {
        return this.sv4 != null ? this.sv4.getCount() : this.container.getRecordCount();
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch, org.apache.drill.exec.record.VectorAccessible
    public SelectionVector4 getSelectionVector4() {
        return this.sv4;
    }

    private void closeBatchGroups(Collection<? extends BatchGroup> collection) {
        Iterator<? extends BatchGroup> it = collection.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                this.context.fail(e);
            }
        }
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch
    public void buildSchema() {
        RecordBatch.IterOutcome next = next(this.incoming);
        switch (next) {
            case OK:
            case OK_NEW_SCHEMA:
                for (VectorWrapper vectorWrapper : this.incoming) {
                    ValueVector addOrGet = this.container.addOrGet(vectorWrapper.getField());
                    if (addOrGet instanceof AbstractContainerVector) {
                        vectorWrapper.getValueVector().makeTransferPair(addOrGet);
                        addOrGet.clear();
                    }
                    addOrGet.allocateNew();
                }
                this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
                this.container.setRecordCount(0);
                return;
            case STOP:
                this.state = AbstractRecordBatch.BatchState.STOP;
                return;
            case OUT_OF_MEMORY:
                this.state = AbstractRecordBatch.BatchState.OUT_OF_MEMORY;
                return;
            case NONE:
                this.state = AbstractRecordBatch.BatchState.DONE;
                return;
            default:
                throw new IllegalStateException("Unexpected iter outcome: " + next);
        }
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch
    public RecordBatch.IterOutcome innerNext() {
        switch (this.sortState) {
            case DONE:
                return RecordBatch.IterOutcome.NONE;
            case START:
            case LOAD:
                return load();
            case DELIVER:
                return nextOutputBatch();
            default:
                throw new IllegalStateException("Unexpected sort state: " + this.sortState);
        }
    }

    private RecordBatch.IterOutcome nextOutputBatch() {
        if (this.resultsIterator.next()) {
            injector.injectUnchecked(this.context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
            return RecordBatch.IterOutcome.OK;
        }
        logger.trace("Deliver phase complete: Returned {} batches, {} records", Integer.valueOf(this.resultsIterator.getBatchCount()), Integer.valueOf(this.resultsIterator.getRecordCount()));
        this.sortState = SortState.DONE;
        this.resultsIterator.close();
        this.resultsIterator = null;
        return RecordBatch.IterOutcome.NONE;
    }

    private RecordBatch.IterOutcome loadBatch() {
        RecordBatch.IterOutcome next;
        if (this.sortState == SortState.START) {
            this.sortState = SortState.LOAD;
            next = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        } else {
            next = next(this.incoming);
        }
        switch (next) {
            case OK:
            case OK_NEW_SCHEMA:
                setupSchema(next);
                processBatch();
                break;
            case STOP:
            case NONE:
                return next;
            case OUT_OF_MEMORY:
                logger.error("received OUT_OF_MEMORY, trying to spill");
                if (this.bufferedBatches.size() <= 2) {
                    logger.error("not enough batches to spill, sending OUT_OF_MEMORY downstream");
                    return RecordBatch.IterOutcome.OUT_OF_MEMORY;
                }
                spillFromMemory();
                break;
            default:
                throw new IllegalStateException("Unexpected iter outcome: " + next);
        }
        return RecordBatch.IterOutcome.OK;
    }

    private RecordBatch.IterOutcome load() {
        RecordBatch.IterOutcome loadBatch;
        logger.trace("Start of load phase");
        this.container.clear();
        do {
            loadBatch = loadBatch();
            if (loadBatch == RecordBatch.IterOutcome.NONE) {
                if (this.inputRecordCount == 0) {
                    this.sortState = SortState.DONE;
                    return RecordBatch.IterOutcome.NONE;
                }
                logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}", new Object[]{Integer.valueOf(this.inputBatchCount), Integer.valueOf(this.spilledRuns.size()), Long.valueOf(this.totalInputBytes)});
                return canUseMemoryMerge() ? sortInMemory() : mergeSpilledRuns();
            }
        } while (loadBatch == RecordBatch.IterOutcome.OK);
        return loadBatch;
    }

    private boolean canUseMemoryMerge() {
        if (this.spillSet.hasSpilled()) {
            return false;
        }
        return this.memoryLimit - this.allocator.getAllocatedMemory() >= org.apache.drill.exec.physical.impl.xsort.MSortTemplate.memoryNeeded(this.inputRecordCount) && this.bufferedBatches.size() <= 65535;
    }

    private void setupSchema(RecordBatch.IterOutcome iterOutcome) {
        if (this.schema == null) {
            this.schema = this.incoming.getSchema();
        } else {
            if (iterOutcome == RecordBatch.IterOutcome.OK || this.incoming.getSchema().equals(this.schema)) {
                return;
            }
            if (!this.unionTypeEnabled) {
                throw UserException.unsupportedError().message("Schema changes not supported in External Sort. Please enable Union type.", new Object[0]).build(logger);
            }
            this.schema = SchemaUtil.mergeSchemas(this.schema, this.incoming.getSchema());
            this.opCodeGen.setSchema(this.schema);
        }
        Iterator<BatchGroup.InputBatch> it = this.bufferedBatches.iterator();
        while (it.hasNext()) {
            it.next().setSchema(this.schema);
        }
        Iterator<BatchGroup.SpilledRun> it2 = this.spilledRuns.iterator();
        while (it2.hasNext()) {
            it2.next().setSchema(this.schema);
        }
    }

    private VectorContainer convertBatch() {
        VectorContainer coerceContainer = SchemaUtil.coerceContainer(this.incoming, this.schema, this.oContext);
        if (this.incoming.getRecordCount() != 0) {
            return coerceContainer;
        }
        Iterator<VectorWrapper<?>> it = coerceContainer.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        return null;
    }

    private SelectionVector2 makeSelectionVector() {
        return this.incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE ? this.incoming.getSelectionVector2().m804clone() : newSV2();
    }

    private void processBatch() {
        if (this.incoming.getRecordCount() == 0) {
            return;
        }
        RecordBatchSizer analyzeIncomingBatch = analyzeIncomingBatch();
        if (isSpillNeeded(analyzeIncomingBatch.actualSize())) {
            spillFromMemory();
        }
        long allocatedMemory = this.allocator.getAllocatedMemory();
        if (allocatedMemory > this.bufferMemoryPool) {
            logger.error("ERROR: Failed to spill above buffer limit. Buffer pool = {}, memory = {}", Long.valueOf(this.bufferMemoryPool), Long.valueOf(allocatedMemory));
        }
        VectorContainer convertBatch = convertBatch();
        if (convertBatch == null) {
            return;
        }
        SelectionVector2 makeSelectionVector = makeSelectionVector();
        long allocatedMemory2 = this.allocator.getAllocatedMemory();
        long j = allocatedMemory2 - allocatedMemory;
        this.inputRecordCount += makeSelectionVector.getCount();
        this.inputBatchCount++;
        this.totalInputBytes += analyzeIncomingBatch.actualSize();
        if (this.minimumBufferSpace == 0) {
            this.minimumBufferSpace = allocatedMemory2;
        } else {
            this.minimumBufferSpace = Math.min(this.minimumBufferSpace, allocatedMemory2);
        }
        this.stats.setLongStat(Metric.MIN_BUFFER, this.minimumBufferSpace);
        updateMemoryEstimates(j, analyzeIncomingBatch);
        SingleBatchSorter sorter = this.opCodeGen.getSorter(convertBatch);
        try {
            sorter.setup(this.context, makeSelectionVector, convertBatch);
            try {
                sorter.sort(makeSelectionVector);
                RecordBatchData recordBatchData = new RecordBatchData(convertBatch, this.allocator);
                try {
                    recordBatchData.setSv2(makeSelectionVector);
                    this.bufferedBatches.add(new BatchGroup.InputBatch(recordBatchData.getContainer(), recordBatchData.getSv2(), this.oContext, analyzeIncomingBatch.netSize()));
                    if (this.peakNumBatches < this.bufferedBatches.size()) {
                        this.peakNumBatches = this.bufferedBatches.size();
                        this.stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, this.peakNumBatches);
                    }
                } catch (Throwable th) {
                    recordBatchData.clear();
                    throw th;
                }
            } catch (SchemaChangeException e) {
                convertBatch.clear();
                throw UserException.unsupportedError(e).message("Unexpected schema change.", new Object[0]).build(logger);
            }
        } catch (SchemaChangeException e2) {
            convertBatch.clear();
            throw UserException.unsupportedError(e2).message("Unexpected schema change.", new Object[0]).build(logger);
        }
    }

    private RecordBatchSizer analyzeIncomingBatch() {
        RecordBatchSizer recordBatchSizer = new RecordBatchSizer(this.incoming);
        recordBatchSizer.applySv2();
        if (this.inputBatchCount == 0) {
            logger.debug(ExecutionControls.DEFAULT_CONTROLS, recordBatchSizer.toString());
        }
        return recordBatchSizer;
    }

    private void updateMemoryEstimates(long j, RecordBatchSizer recordBatchSizer) {
        long actualSize = recordBatchSizer.actualSize();
        int rowCount = recordBatchSizer.rowCount();
        if (actualSize != j) {
            logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}", new Object[]{Long.valueOf(j), Long.valueOf(actualSize), Long.valueOf(j - actualSize)});
        }
        if (rowCount == 0) {
            return;
        }
        if (recordBatchSizer.avgDensity() < (this.maxDensity * 3) / 4 && recordBatchSizer.avgDensity() != this.lastDensity) {
            logger.trace("Saw low density batch. Density: {}", Integer.valueOf(recordBatchSizer.avgDensity()));
            this.lastDensity = recordBatchSizer.avgDensity();
            return;
        }
        this.maxDensity = Math.max(this.maxDensity, recordBatchSizer.avgDensity());
        int netRowWidth = recordBatchSizer.netRowWidth();
        int i = this.estimatedRowWidth;
        this.estimatedRowWidth = Math.max(this.estimatedRowWidth, netRowWidth);
        long j2 = this.estimatedInputBatchSize;
        this.estimatedInputBatchSize = Math.max(this.estimatedInputBatchSize, actualSize);
        if (this.estimatedRowWidth == 0) {
            this.estimatedRowWidth = 10;
        }
        if (this.estimatedRowWidth == i && this.estimatedInputBatchSize == j2) {
            return;
        }
        long j3 = this.estimatedInputBatchSize + (4 * rowCount);
        this.spillBatchRowCount = (int) Math.max(1L, (this.preferredSpillBatchSize.longValue() / this.estimatedRowWidth) / 2);
        this.spillBatchRowCount = Math.min(this.spillBatchRowCount, 65535);
        this.targetSpillBatchSize = this.spillBatchRowCount * this.estimatedRowWidth * 2;
        this.mergeBatchRowCount = (int) Math.max(1L, (this.preferredMergeBatchSize / this.estimatedRowWidth) / 2);
        this.mergeBatchRowCount = Math.min(this.mergeBatchRowCount, 65535);
        this.mergeBatchRowCount = Math.max(1, this.mergeBatchRowCount);
        this.targetMergeBatchSize = this.mergeBatchRowCount * this.estimatedRowWidth * 2;
        long j4 = this.estimatedInputBatchSize + (2 * this.targetSpillBatchSize);
        long j5 = (2 * this.targetSpillBatchSize) + this.targetMergeBatchSize;
        if (j5 > this.memoryLimit) {
            this.targetMergeBatchSize = Math.max(this.estimatedRowWidth, (this.memoryLimit - (2 * this.targetSpillBatchSize)) / 2);
            this.mergeBatchRowCount = (int) ((this.targetMergeBatchSize / this.estimatedRowWidth) / 2);
            j5 = (2 * this.targetSpillBatchSize) + this.targetMergeBatchSize;
        }
        this.bufferMemoryPool = this.memoryLimit - j4;
        this.mergeMemoryPool = Math.max(this.memoryLimit - j5, (long) ((this.memoryLimit - (3 * this.targetMergeBatchSize)) * 0.95d));
        long max = Math.max(j4 + j3, j5);
        if (max > this.memoryLimit) {
            logger.warn("Potential memory overflow! Minumum needed = {} bytes, actual available = {} bytes", Long.valueOf(max), Long.valueOf(this.memoryLimit));
        }
        logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records", new Object[]{Integer.valueOf(this.estimatedRowWidth), Long.valueOf(this.estimatedInputBatchSize), Integer.valueOf(rowCount)});
        logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes", new Object[]{Integer.valueOf(this.targetSpillBatchSize), Integer.valueOf(this.spillBatchRowCount), Long.valueOf(this.spillFileSize)});
        logger.debug("Output batch size = {} bytes, {} records", Long.valueOf(this.targetMergeBatchSize), Integer.valueOf(this.mergeBatchRowCount));
        logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}", new Object[]{Long.valueOf(this.memoryLimit), Long.valueOf(this.bufferMemoryPool), Long.valueOf(this.mergeMemoryPool)});
    }

    private boolean isSpillNeeded(int i) {
        return this.bufferedBatches.size() >= 2 && this.allocator.getAllocatedMemory() + ((long) i) >= this.bufferMemoryPool;
    }

    private RecordBatch.IterOutcome sortInMemory() {
        logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}", new Object[]{Integer.valueOf(this.bufferedBatches.size()), Integer.valueOf(this.inputRecordCount), Long.valueOf(this.allocator.getAllocatedMemory())});
        MergeSort mergeSort = new MergeSort(this.context, this.allocator, this.opCodeGen);
        try {
            this.sv4 = mergeSort.merge(this.bufferedBatches, this, this.container);
            if (this.sv4 == null) {
                this.sortState = SortState.DONE;
                RecordBatch.IterOutcome iterOutcome = RecordBatch.IterOutcome.STOP;
                if (mergeSort != null) {
                    mergeSort.close();
                }
                return iterOutcome;
            }
            logger.debug("Completed in-memory sort. Memory = {}", Long.valueOf(this.allocator.getAllocatedMemory()));
            this.resultsIterator = mergeSort;
            MergeSort mergeSort2 = null;
            this.sortState = SortState.DELIVER;
            RecordBatch.IterOutcome iterOutcome2 = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
            if (0 != 0) {
                mergeSort2.close();
            }
            return iterOutcome2;
        } catch (Throwable th) {
            if (mergeSort != null) {
                mergeSort.close();
            }
            throw th;
        }
    }

    private RecordBatch.IterOutcome mergeSpilledRuns() {
        logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}", new Object[]{Integer.valueOf(this.inputBatchCount), Integer.valueOf(this.inputRecordCount), Long.valueOf(this.allocator.getAllocatedMemory()), Integer.valueOf(this.bufferedBatches.size()), Integer.valueOf(this.spilledRuns.size())});
        int i = 0;
        while (consolidateBatches()) {
            i++;
        }
        this.stats.addLongStat(Metric.MERGE_COUNT, i);
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(this.bufferedBatches);
        this.bufferedBatches.clear();
        linkedList.addAll(this.spilledRuns);
        this.spilledRuns.clear();
        logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}", Integer.valueOf(linkedList.size()), Long.valueOf(this.allocator.getAllocatedMemory()));
        CopierHolder.BatchMerger startFinalMerge = this.copierHolder.startFinalMerge(this.schema, linkedList, this.container, this.mergeBatchRowCount);
        startFinalMerge.next();
        this.resultsIterator = startFinalMerge;
        this.sortState = SortState.DELIVER;
        return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    }

    private boolean consolidateBatches() {
        int size = this.bufferedBatches.size();
        int size2 = this.spilledRuns.size();
        int max = Math.max(Math.min(this.mergeLimit, (int) (this.mergeMemoryPool / this.targetSpillBatchSize)), 2);
        if (size > 0) {
            if (size2 > max) {
                spillFromMemory();
                return true;
            }
            if (size + size2 > this.mergeLimit) {
                spillFromMemory();
                return true;
            }
            if ((size2 * this.targetSpillBatchSize) + this.allocator.getAllocatedMemory() > this.mergeMemoryPool) {
                spillFromMemory();
                return true;
            }
        }
        int i = size2 - max;
        if (i <= 0) {
            return false;
        }
        int min = Math.min(Math.max(2, i) + 1, max);
        if (size > 0) {
            spillFromMemory();
            return true;
        }
        logger.trace("Merging {} on-disk runs, Alloc. memory = {}", Integer.valueOf(min), Long.valueOf(this.allocator.getAllocatedMemory()));
        mergeAndSpill(this.spilledRuns, min);
        return true;
    }

    private void spillFromMemory() {
        int i = 0;
        long j = 0;
        Iterator<BatchGroup.InputBatch> it = this.bufferedBatches.iterator();
        while (it.hasNext()) {
            long dataSize = it.next().getDataSize();
            j += dataSize;
            i++;
            if (j + (dataSize / 2) > this.spillFileSize) {
                break;
            }
        }
        mergeAndSpill(this.bufferedBatches, Math.min(Math.max(i, 2), this.bufferedBatches.size()));
    }

    private void mergeAndSpill(LinkedList<? extends BatchGroup> linkedList, int i) {
        this.spilledRuns.add(doMergeAndSpill(linkedList, i));
    }

    private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends BatchGroup> linkedList, int i) {
        ArrayList newArrayList = Lists.newArrayList();
        int min = Math.min(linkedList.size(), i);
        if (!$assertionsDisabled && min <= 0) {
            throw new AssertionError("Spill count to mergeAndSpill must not be zero");
        }
        for (int i2 = 0; i2 < min; i2++) {
            newArrayList.add(linkedList.pollFirst());
        }
        String nextSpillFile = this.spillSet.getNextSpillFile();
        this.stats.setLongStat(Metric.SPILL_COUNT, this.spillSet.getFileCount());
        BatchGroup.SpilledRun spilledRun = null;
        try {
            AutoCloseable all = AutoCloseables.all(newArrayList);
            Throwable th = null;
            try {
                CopierHolder.BatchMerger startMerge = this.copierHolder.startMerge(this.schema, newArrayList, this.spillBatchRowCount);
                Throwable th2 = null;
                try {
                    logger.trace("Spilling {} of {} batches, {} rows, memory = {}, write to {}", new Object[]{Integer.valueOf(newArrayList.size()), Integer.valueOf(this.bufferedBatches.size() + newArrayList.size()), Integer.valueOf(this.spillBatchRowCount), Long.valueOf(this.allocator.getAllocatedMemory()), nextSpillFile});
                    spilledRun = new BatchGroup.SpilledRun(this.spillSet, nextSpillFile, this.oContext);
                    while (startMerge.next()) {
                        spilledRun.addBatch(startMerge.getOutput());
                    }
                    injector.injectChecked(this.context.getExecutionControls(), "spilling", IOException.class);
                    spilledRun.closeOutputStream();
                    logger.trace("mergeAndSpill: completed, memory = {}, spilled {} records to {}", new Object[]{Long.valueOf(this.allocator.getAllocatedMemory()), Integer.valueOf(startMerge.getRecordCount()), nextSpillFile});
                    if (startMerge != null) {
                        if (0 != 0) {
                            try {
                                startMerge.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            startMerge.close();
                        }
                    }
                    return spilledRun;
                } catch (Throwable th4) {
                    if (startMerge != null) {
                        if (0 != 0) {
                            try {
                                startMerge.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            startMerge.close();
                        }
                    }
                    throw th4;
                }
            } finally {
                if (all != null) {
                    if (0 != 0) {
                        try {
                            all.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        all.close();
                    }
                }
            }
        } catch (Throwable th7) {
            if (spilledRun != null) {
                try {
                    AutoCloseables.close(th7, new AutoCloseable[]{spilledRun});
                } catch (Throwable th8) {
                    throw th7;
                }
            }
            try {
                throw th7;
            } catch (UserException e) {
                throw e;
            } catch (Throwable th9) {
                throw UserException.resourceError(th9).message("External Sort encountered an error while spilling to disk", new Object[0]).build(logger);
            }
        }
    }

    private SelectionVector2 newSV2() {
        SelectionVector2 selectionVector2 = new SelectionVector2(this.allocator);
        if (!selectionVector2.allocateNewSafe(this.incoming.getRecordCount())) {
            throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer")).build(logger);
        }
        for (int i = 0; i < this.incoming.getRecordCount(); i++) {
            selectionVector2.setIndex(i, (char) i);
        }
        selectionVector2.setRecordCount(this.incoming.getRecordCount());
        return selectionVector2;
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch, org.apache.drill.exec.record.RecordBatch
    public WritableBatch getWritableBatch() {
        throw new UnsupportedOperationException("A sort batch is not writable.");
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch
    protected void killIncoming(boolean z) {
        this.incoming.kill(z);
    }

    @Override // org.apache.drill.exec.record.AbstractRecordBatch, java.lang.AutoCloseable
    public void close() {
        if (this.spillSet.getWriteBytes() > 0) {
            logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}", Long.valueOf(this.spillSet.getWriteBytes()), Long.valueOf(this.spillSet.getWriteBytes()));
        }
        this.stats.setLongStat(Metric.SPILL_MB, (int) Math.round((this.spillSet.getWriteBytes() / 1024.0d) / 1024.0d));
        RuntimeException runtimeException = null;
        try {
            if (this.bufferedBatches != null) {
                closeBatchGroups(this.bufferedBatches);
                this.bufferedBatches = null;
            }
        } catch (RuntimeException e) {
            runtimeException = e;
        }
        try {
            if (this.spilledRuns != null) {
                closeBatchGroups(this.spilledRuns);
                this.spilledRuns = null;
            }
        } catch (RuntimeException e2) {
            runtimeException = runtimeException == null ? e2 : runtimeException;
        }
        try {
            if (this.sv4 != null) {
                this.sv4.clear();
            }
        } catch (RuntimeException e3) {
            runtimeException = runtimeException == null ? e3 : runtimeException;
        }
        try {
            if (this.resultsIterator != null) {
                this.resultsIterator.close();
                this.resultsIterator = null;
            }
        } catch (RuntimeException e4) {
            runtimeException = runtimeException == null ? e4 : runtimeException;
        }
        try {
            this.copierHolder.close();
        } catch (RuntimeException e5) {
            runtimeException = runtimeException == null ? e5 : runtimeException;
        }
        try {
            this.spillSet.close();
        } catch (RuntimeException e6) {
            runtimeException = runtimeException == null ? e6 : runtimeException;
        }
        try {
            this.opCodeGen.close();
        } catch (RuntimeException e7) {
            runtimeException = runtimeException == null ? e7 : runtimeException;
        }
        try {
            super.close();
        } catch (RuntimeException e8) {
            runtimeException = runtimeException == null ? e8 : runtimeException;
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    static {
        $assertionsDisabled = !ExternalSortBatch.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(ExternalSortBatch.class);
        injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    }
}
