package org.apache.drill.exec.physical.impl.partitionsender;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.class */
public class PartitionerDecorator {
    private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
    private List<Partitioner> partitioners;
    private final OperatorStats stats;
    private final String tName = Thread.currentThread().getName();
    private final String childThreadPrefix = "Partitioner-" + this.tName + "-";
    private final ExecutorService executor;
    private final FragmentContext context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$CustomRunnable.class */
    public static class CustomRunnable implements Runnable {
        private final String parentThreadName;
        private final CountDownLatch latch;
        private final GeneralExecuteIface iface;
        private final Partitioner part;
        private CountDownLatchInjection testCountDownLatch;
        private volatile IOException exp;

        public CustomRunnable(String str, CountDownLatch countDownLatch, GeneralExecuteIface generalExecuteIface, Partitioner partitioner, CountDownLatchInjection countDownLatchInjection) {
            this.parentThreadName = str;
            this.latch = countDownLatch;
            this.iface = generalExecuteIface;
            this.part = partitioner;
            this.testCountDownLatch = countDownLatchInjection;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.testCountDownLatch.await();
            } catch (InterruptedException e) {
                PartitionerDecorator.logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
            }
            Thread currentThread = Thread.currentThread();
            String name = currentThread.getName();
            OperatorStats stats = this.part.getStats();
            try {
                try {
                    currentThread.setName(this.parentThreadName + currentThread.getId());
                    stats.clear();
                    stats.startProcessing();
                    this.iface.execute(this.part);
                    stats.stopProcessing();
                    currentThread.setName(name);
                    this.latch.countDown();
                } catch (IOException e2) {
                    this.exp = e2;
                    stats.stopProcessing();
                    currentThread.setName(name);
                    this.latch.countDown();
                }
            } catch (Throwable th) {
                stats.stopProcessing();
                currentThread.setName(name);
                this.latch.countDown();
                throw th;
            }
        }

        public IOException getException() {
            return this.exp;
        }

        public Partitioner getPart() {
            return this.part;
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$FlushBatchesHandlingClass.class */
    private static class FlushBatchesHandlingClass implements GeneralExecuteIface {
        private final boolean isLastBatch;
        private final boolean schemaChanged;

        public FlushBatchesHandlingClass(boolean z, boolean z2) {
            this.isLastBatch = z;
            this.schemaChanged = z2;
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface
        public void execute(Partitioner partitioner) throws IOException {
            partitioner.flushOutgoingBatches(this.isLastBatch, this.schemaChanged);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$GeneralExecuteIface.class */
    public interface GeneralExecuteIface {
        void execute(Partitioner partitioner) throws IOException;
    }

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$PartitionBatchHandlingClass.class */
    private static class PartitionBatchHandlingClass implements GeneralExecuteIface {
        private final RecordBatch incoming;

        public PartitionBatchHandlingClass(RecordBatch recordBatch) {
            this.incoming = recordBatch;
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface
        public void execute(Partitioner partitioner) throws IOException {
            partitioner.partitionBatch(this.incoming);
        }
    }

    public PartitionerDecorator(List<Partitioner> list, OperatorStats operatorStats, FragmentContext fragmentContext) {
        this.partitioners = list;
        this.stats = operatorStats;
        this.context = fragmentContext;
        this.executor = fragmentContext.getDrillbitContext().getExecutor();
    }

    public void partitionBatch(RecordBatch recordBatch) throws IOException {
        executeMethodLogic(new PartitionBatchHandlingClass(recordBatch));
    }

    public void flushOutgoingBatches(boolean z, boolean z2) throws IOException {
        executeMethodLogic(new FlushBatchesHandlingClass(z, z2));
    }

    public void initialize() {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            it.next().initialize();
        }
    }

    public void clear() {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    public PartitionOutgoingBatch getOutgoingBatches(int i) {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            PartitionOutgoingBatch outgoingBatch = it.next().getOutgoingBatch(i);
            if (outgoingBatch != null) {
                return outgoingBatch;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public List<Partitioner> getPartitioners() {
        return this.partitioners;
    }

    protected void executeMethodLogic(GeneralExecuteIface generalExecuteIface) throws IOException {
        if (this.partitioners.size() == 1) {
            OperatorStats stats = this.partitioners.get(0).getStats();
            stats.clear();
            stats.startProcessing();
            try {
                generalExecuteIface.execute(this.partitioners.get(0));
                return;
            } finally {
                stats.stopProcessing();
                this.stats.mergeMetrics(stats);
                this.stats.adjustWaitNanos(stats.getWaitNanos());
            }
        }
        long j = 0;
        this.stats.startWait();
        CountDownLatch countDownLatch = new CountDownLatch(this.partitioners.size());
        ArrayList<CustomRunnable> newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        CountDownLatchInjection countDownLatchInjection = null;
        try {
            countDownLatchInjection = injector.getLatch(this.context.getExecutionControls(), "partitioner-sender-latch");
            countDownLatchInjection.initialize(1);
            Iterator<Partitioner> it = this.partitioners.iterator();
            while (it.hasNext()) {
                CustomRunnable customRunnable = new CustomRunnable(this.childThreadPrefix, countDownLatch, generalExecuteIface, it.next(), countDownLatchInjection);
                newArrayList.add(customRunnable);
                newArrayList2.add(this.executor.submit(customRunnable));
            }
            while (true) {
                try {
                    injector.injectInterruptiblePause(this.context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
                    injector.getLatch(this.context.getExecutionControls(), "partitioner-sender-latch").countDown();
                    countDownLatch.await();
                    break;
                } catch (InterruptedException e) {
                    if (!this.context.shouldContinue()) {
                        logger.debug("Interrupting partioner threads. Fragment thread {}", this.tName);
                        Iterator it2 = newArrayList2.iterator();
                        while (it2.hasNext()) {
                            ((Future) it2.next()).cancel(true);
                        }
                    }
                }
            }
            IOException iOException = null;
            for (CustomRunnable customRunnable2 : newArrayList) {
                IOException exception = customRunnable2.getException();
                if (exception != null) {
                    if (iOException == null) {
                        iOException = exception;
                    } else {
                        iOException.addSuppressed(exception);
                    }
                }
                OperatorStats stats2 = customRunnable2.getPart().getStats();
                long processingNanos = stats2.getProcessingNanos();
                j = processingNanos > j ? processingNanos : j;
                this.stats.mergeMetrics(stats2);
            }
            if (iOException != null) {
                throw iOException;
            }
            this.stats.stopWait();
            this.stats.adjustWaitNanos(-j);
            if (countDownLatchInjection != null) {
                countDownLatchInjection.close();
            }
        } catch (Throwable th) {
            this.stats.stopWait();
            this.stats.adjustWaitNanos(-j);
            if (countDownLatchInjection != null) {
                countDownLatchInjection.close();
            }
            throw th;
        }
    }
}
