package org.apache.drill.exec.physical.impl.partitionsender;

import com.carrotsearch.hppc.IntArrayList;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.class */
public class PartitionSenderRootExec extends BaseRootExec {
    private static final Logger logger = LoggerFactory.getLogger(PartitionSenderRootExec.class);
    private final RecordBatch incoming;
    private final HashPartitionSender operator;
    private PartitionerDecorator partitioner;
    private final ExchangeFragmentContext context;
    private final int outGoingBatchCount;
    private final HashPartitionSender popConfig;
    private final double cost;
    private final AtomicIntegerArray remainingReceivers;
    private final AtomicInteger remaingReceiverCount;
    private boolean done;
    private boolean first;
    private final boolean closeIncoming;
    long minReceiverRecordCount;
    long maxReceiverRecordCount;
    protected final int numberPartitions;
    protected final int actualPartitions;
    private final IntArrayList terminations;

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec$Metric.class */
    public enum Metric implements MetricDef {
        BATCHES_SENT,
        RECORDS_SENT,
        MIN_RECORDS,
        MAX_RECORDS,
        N_RECEIVERS,
        BYTES_SENT,
        SENDING_THREADS_COUNT,
        COST;

        @Override // org.apache.drill.exec.ops.MetricDef
        public int metricId() {
            return ordinal();
        }
    }

    public PartitionSenderRootExec(RootFragmentContext rootFragmentContext, RecordBatch recordBatch, HashPartitionSender hashPartitionSender) throws OutOfMemoryException {
        this(rootFragmentContext, recordBatch, hashPartitionSender, false);
    }

    public PartitionSenderRootExec(RootFragmentContext rootFragmentContext, RecordBatch recordBatch, HashPartitionSender hashPartitionSender, boolean z) throws OutOfMemoryException {
        super(rootFragmentContext, rootFragmentContext.newOperatorContext(hashPartitionSender, null), hashPartitionSender);
        this.first = true;
        this.minReceiverRecordCount = Long.MAX_VALUE;
        this.maxReceiverRecordCount = Long.MIN_VALUE;
        this.terminations = new IntArrayList();
        this.incoming = recordBatch;
        this.operator = hashPartitionSender;
        this.closeIncoming = z;
        this.context = rootFragmentContext;
        this.outGoingBatchCount = hashPartitionSender.getDestinations().size();
        this.popConfig = hashPartitionSender;
        this.remainingReceivers = new AtomicIntegerArray(this.outGoingBatchCount);
        this.remaingReceiverCount = new AtomicInteger(this.outGoingBatchCount);
        this.stats.setLongStat(Metric.N_RECEIVERS, this.outGoingBatchCount);
        this.cost = hashPartitionSender.getChild().getCost().getOutputRowCount();
        OptionManager options = rootFragmentContext.getOptions();
        long longValue = options.getOption(ExecConstants.SLICE_TARGET).num_val.longValue();
        int intValue = options.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();
        int i = 1;
        if (longValue != 0 && this.outGoingBatchCount != 0) {
            i = (int) Math.round(((this.cost / (longValue * 1.0d)) / (this.outGoingBatchCount * 1.0d)) / (intValue * 1.0d));
            if (i < 1) {
                i = 1;
            }
        }
        int intValue2 = options.getOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue();
        if (intValue2 > 0) {
            this.numberPartitions = intValue2;
        } else {
            this.numberPartitions = Math.min(i, options.getOption(PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue());
        }
        logger.info("Preliminary number of sending threads is: " + this.numberPartitions);
        this.actualPartitions = this.outGoingBatchCount > this.numberPartitions ? this.numberPartitions : this.outGoingBatchCount;
        this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, this.actualPartitions);
        this.stats.setDoubleStat(Metric.COST, this.cost);
    }

    @Override // org.apache.drill.exec.physical.impl.BaseRootExec
    public boolean innerNext() {
        RecordBatch.IterOutcome iterOutcome;
        if (this.done) {
            this.incoming.cancel();
            iterOutcome = RecordBatch.IterOutcome.NONE;
        } else {
            iterOutcome = next(this.incoming);
        }
        logger.debug("Partitioner.next(): got next record batch with status {}", iterOutcome);
        if (this.first && iterOutcome == RecordBatch.IterOutcome.OK) {
            iterOutcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        switch (iterOutcome) {
            case NONE:
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(true, false);
                    } else {
                        sendEmptyBatch(true);
                    }
                    return false;
                } catch (ExecutionException e) {
                    throw UserException.dataWriteError(e).addContext("Error while creating partitioning sender or flushing outgoing batches").build(logger);
                }
            case OK_NEW_SCHEMA:
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(false, true);
                        this.partitioner.clear();
                    }
                    try {
                        this.stats.startSetup();
                        createPartitioner();
                        this.stats.stopSetup();
                        if (this.first) {
                            this.first = false;
                            sendEmptyBatch(false);
                        }
                        break;
                    } catch (Throwable th) {
                        this.stats.stopSetup();
                        throw th;
                    }
                } catch (ExecutionException e2) {
                    throw UserException.dataWriteError(e2).addContext("Error while flushing outgoing batches").build(logger);
                }
            case OK:
                break;
            case NOT_YET:
            default:
                throw new IllegalStateException();
        }
        try {
            this.partitioner.partitionBatch(this.incoming);
            VectorAccessibleUtilities.clear((VectorAccessible) this.incoming);
            return true;
        } catch (ExecutionException e3) {
            throw UserException.dataWriteError(e3).addContext("Error while partitioning outgoing batches").build(logger);
        }
    }

    @VisibleForTesting
    protected void createPartitioner() {
        createClassInstances(this.actualPartitions);
    }

    private List<Partitioner> createClassInstances(int i) {
        LogicalExpression expr = this.operator.getExpr();
        ErrorCollectorImpl errorCollectorImpl = new ErrorCollectorImpl();
        ClassGenerator root = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, this.context.getOptions());
        root.getCodeGenerator().plainJavaCapable(true);
        ClassGenerator<?> innerGenerator = root.getInnerGenerator("OutgoingRecordBatch");
        LogicalExpression materialize = ExpressionTreeMaterializer.materialize(expr, this.incoming, errorCollectorImpl, this.context.getFunctionRegistry());
        errorCollectorImpl.reportErrors(logger);
        JExpression direct = JExpr.direct("bucket");
        root.getEvalBlock().decl(JType.parse(root.getModel(), "int"), "bucket", root.addExpr(materialize).getValue().mod(JExpr.lit(this.outGoingBatchCount)));
        root.getEvalBlock()._return(root.getModel().ref(Math.class).staticInvoke("abs").arg(direct));
        CopyUtil.generateCopies(innerGenerator, this.incoming, this.incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
        List<Partitioner> implementationClass = this.context.getImplementationClass(root, i);
        int max = Math.max(1, this.outGoingBatchCount / i);
        int i2 = this.outGoingBatchCount % i;
        int i3 = 0;
        boolean z = false;
        int i4 = 0;
        while (i4 < i) {
            try {
                try {
                    int i5 = i3;
                    i3 = i4 < i - 1 ? i5 + max : this.outGoingBatchCount;
                    if (i4 < i2) {
                        i3++;
                    }
                    implementationClass.get(i4).setup(this.context, this.incoming, this.popConfig, new OperatorStats(this.stats, true), this.oContext, innerGenerator, i5, i3);
                    i4++;
                } catch (SchemaChangeException e) {
                    throw AbstractRecordBatch.schemaChangeException(e, "Partition Sender", logger);
                }
            } catch (Throwable th) {
                if (!z) {
                    Iterator<Partitioner> it = implementationClass.iterator();
                    while (it.hasNext()) {
                        it.next().clear();
                    }
                }
                throw th;
            }
        }
        this.partitioner = new PartitionerDecorator(implementationClass, this.stats, this.context);
        for (int i6 = 0; i6 < this.terminations.size(); i6++) {
            this.partitioner.getOutgoingBatches(this.terminations.buffer[i6]).terminate();
        }
        this.terminations.clear();
        z = true;
        if (1 == 0) {
            Iterator<Partitioner> it2 = implementationClass.iterator();
            while (it2.hasNext()) {
                it2.next().clear();
            }
        }
        return implementationClass;
    }

    private void updateAggregateStats() {
        Iterator<Partitioner> it = this.partitioner.getPartitioners().iterator();
        while (it.hasNext()) {
            Iterator<? extends PartitionOutgoingBatch> it2 = it.next().getOutgoingBatches().iterator();
            while (it2.hasNext()) {
                long totalRecords = it2.next().getTotalRecords();
                this.minReceiverRecordCount = Math.min(this.minReceiverRecordCount, totalRecords);
                this.maxReceiverRecordCount = Math.max(this.maxReceiverRecordCount, totalRecords);
            }
        }
        this.stats.setLongStat(Metric.MIN_RECORDS, this.minReceiverRecordCount);
        this.stats.setLongStat(Metric.MAX_RECORDS, this.maxReceiverRecordCount);
    }

    @Override // org.apache.drill.exec.physical.impl.BaseRootExec, org.apache.drill.exec.physical.impl.RootExec
    public void receivingFragmentFinished(ExecProtos.FragmentHandle fragmentHandle) {
        int minorFragmentId = fragmentHandle.getMinorFragmentId();
        if (this.remainingReceivers.compareAndSet(minorFragmentId, 0, 1)) {
            if (this.partitioner == null) {
                this.terminations.add(minorFragmentId);
            } else {
                this.partitioner.getOutgoingBatches(minorFragmentId).terminate();
            }
            if (this.remaingReceiverCount.decrementAndGet() == 0) {
                this.done = true;
            }
        }
    }

    @Override // org.apache.drill.exec.physical.impl.BaseRootExec, java.lang.AutoCloseable
    public void close() throws Exception {
        logger.debug("Partition sender stopping.");
        super.close();
        if (this.partitioner != null) {
            updateAggregateStats();
            this.partitioner.clear();
        }
        if (this.closeIncoming) {
            ((CloseableRecordBatch) this.incoming).close();
        }
    }

    private void sendEmptyBatch(boolean z) {
        BatchSchema schema = this.incoming.getSchema();
        if (schema == null) {
            schema = BatchSchema.newBuilder().build();
        }
        ExecProtos.FragmentHandle handle = this.context.getHandle();
        for (MinorFragmentEndpoint minorFragmentEndpoint : this.popConfig.getDestinations()) {
            AccountingDataTunnel dataTunnel = this.context.getDataTunnel(minorFragmentEndpoint.getEndpoint());
            FragmentWritableBatch emptyBatchWithSchema = FragmentWritableBatch.getEmptyBatchWithSchema(z, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), this.operator.getOppositeMajorFragmentId(), minorFragmentEndpoint.getId(), schema);
            this.stats.startWait();
            try {
                dataTunnel.sendRecordBatch(emptyBatchWithSchema);
                this.stats.stopWait();
            } catch (Throwable th) {
                this.stats.stopWait();
                throw th;
            }
        }
        this.stats.addLongStat(Metric.BATCHES_SENT, 1L);
    }

    @VisibleForTesting
    protected PartitionerDecorator getPartitioner() {
        return this.partitioner;
    }
}
