package org.apache.drill.exec.physical.impl.partitionsender;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.class */
public final class PartitionerDecorator {
    private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
    private List<Partitioner> partitioners;
    private final OperatorStats stats;
    private final ExecutorService executor;
    private final FragmentContext context;
    private final Thread thread;
    private final boolean enableParallelTaskExecution;

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$FlushBatchesHandlingClass.class */
    private static class FlushBatchesHandlingClass implements GeneralExecuteIface {
        private final boolean isLastBatch;
        private final boolean schemaChanged;

        public FlushBatchesHandlingClass(boolean z, boolean z2) {
            this.isLastBatch = z;
            this.schemaChanged = z2;
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface
        public void execute(Partitioner partitioner) throws IOException {
            partitioner.flushOutgoingBatches(this.isLastBatch, this.schemaChanged);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$GeneralExecuteIface.class */
    public interface GeneralExecuteIface {
        void execute(Partitioner partitioner) throws IOException;
    }

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$PartitionBatchHandlingClass.class */
    private static class PartitionBatchHandlingClass implements GeneralExecuteIface {
        private final RecordBatch incoming;

        PartitionBatchHandlingClass(RecordBatch recordBatch) {
            this.incoming = recordBatch;
        }

        @Override // org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface
        public void execute(Partitioner partitioner) throws IOException {
            partitioner.partitionBatch(this.incoming);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$PartitionerTask.class */
    public static class PartitionerTask implements Runnable {
        private final AtomicReference<STATE> state = new AtomicReference<>(STATE.NEW);
        private final AtomicReference<Thread> runner = new AtomicReference<>();
        private final PartitionerDecorator partitionerDecorator;
        private final AtomicInteger count;
        private final GeneralExecuteIface iface;
        private final Partitioner partitioner;
        private CountDownLatchInjection testCountDownLatch;
        private volatile ExecutionException exception;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator$PartitionerTask$STATE.class */
        public enum STATE {
            NEW,
            COMPLETING,
            NORMAL,
            EXCEPTIONAL,
            CANCELLED,
            INTERRUPTING,
            INTERRUPTED
        }

        public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface generalExecuteIface, Partitioner partitioner, AtomicInteger atomicInteger, CountDownLatchInjection countDownLatchInjection) {
            this.partitionerDecorator = partitionerDecorator;
            this.iface = generalExecuteIface;
            this.partitioner = partitioner;
            this.count = atomicInteger;
            this.testCountDownLatch = countDownLatchInjection;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread currentThread = Thread.currentThread();
            if (this.runner.compareAndSet(null, currentThread)) {
                String name = currentThread.getName();
                currentThread.setName(String.format("Partitioner-%s-%d", this.partitionerDecorator.thread.getName(), Long.valueOf(currentThread.getId())));
                OperatorStats stats = this.partitioner.getStats();
                stats.clear();
                stats.startProcessing();
                try {
                    try {
                        this.testCountDownLatch.await();
                        if (this.state.get() == STATE.NEW) {
                            this.iface.execute(this.partitioner);
                        }
                        if (this.state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
                            if (0 == 0) {
                                stats.stopProcessing();
                                this.state.lazySet(STATE.NORMAL);
                            } else {
                                this.exception = null;
                                this.state.lazySet(STATE.EXCEPTIONAL);
                            }
                        }
                        if (this.count.decrementAndGet() == 0) {
                            LockSupport.unpark(this.partitionerDecorator.thread);
                        }
                        currentThread.setName(name);
                        while (this.state.get() == STATE.INTERRUPTING) {
                            Thread.yield();
                        }
                        Thread.interrupted();
                    } catch (InterruptedException e) {
                        if (this.state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
                            PartitionerDecorator.logger.warn("Partitioner Task interrupted during the run", e);
                        }
                        if (this.state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
                            if (0 == 0) {
                                stats.stopProcessing();
                                this.state.lazySet(STATE.NORMAL);
                            } else {
                                this.exception = null;
                                this.state.lazySet(STATE.EXCEPTIONAL);
                            }
                        }
                        if (this.count.decrementAndGet() == 0) {
                            LockSupport.unpark(this.partitionerDecorator.thread);
                        }
                        currentThread.setName(name);
                        while (this.state.get() == STATE.INTERRUPTING) {
                            Thread.yield();
                        }
                        Thread.interrupted();
                    } catch (Throwable th) {
                        ExecutionException executionException = new ExecutionException(th);
                        if (this.state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
                            if (executionException == null) {
                                stats.stopProcessing();
                                this.state.lazySet(STATE.NORMAL);
                            } else {
                                this.exception = executionException;
                                this.state.lazySet(STATE.EXCEPTIONAL);
                            }
                        }
                        if (this.count.decrementAndGet() == 0) {
                            LockSupport.unpark(this.partitionerDecorator.thread);
                        }
                        currentThread.setName(name);
                        while (this.state.get() == STATE.INTERRUPTING) {
                            Thread.yield();
                        }
                        Thread.interrupted();
                    }
                } catch (Throwable th2) {
                    if (this.state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
                        if (0 == 0) {
                            stats.stopProcessing();
                            this.state.lazySet(STATE.NORMAL);
                        } else {
                            this.exception = null;
                            this.state.lazySet(STATE.EXCEPTIONAL);
                        }
                    }
                    if (this.count.decrementAndGet() == 0) {
                        LockSupport.unpark(this.partitionerDecorator.thread);
                    }
                    currentThread.setName(name);
                    while (this.state.get() == STATE.INTERRUPTING) {
                        Thread.yield();
                    }
                    Thread.interrupted();
                    throw th2;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void cancel(boolean z) {
            Preconditions.checkState(Thread.currentThread() == this.partitionerDecorator.thread, String.format("PartitionerTask can be cancelled only from the main %s thread", this.partitionerDecorator.thread.getName()));
            if (this.runner.compareAndSet(null, this.partitionerDecorator.thread)) {
                if (this.partitionerDecorator.executor instanceof ThreadPoolExecutor) {
                    ((ThreadPoolExecutor) this.partitionerDecorator.executor).remove(this);
                }
                this.count.decrementAndGet();
            } else if (!z) {
                this.state.compareAndSet(STATE.NEW, STATE.CANCELLED);
            } else if (this.state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) {
                try {
                    this.runner.get().interrupt();
                } finally {
                    this.state.lazySet(STATE.INTERRUPTED);
                }
            }
        }

        public ExecutionException getException() {
            return this.exception;
        }

        public OperatorStats getStats() {
            return this.partitioner.getStats();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionerDecorator(List<Partitioner> list, OperatorStats operatorStats, FragmentContext fragmentContext) {
        this(list, operatorStats, fragmentContext, list.size() > 1);
    }

    PartitionerDecorator(List<Partitioner> list, OperatorStats operatorStats, FragmentContext fragmentContext, boolean z) {
        this.partitioners = list;
        this.stats = operatorStats;
        this.context = fragmentContext;
        this.enableParallelTaskExecution = z;
        this.executor = z ? fragmentContext.getExecutor() : MoreExecutors.newDirectExecutorService();
        this.thread = Thread.currentThread();
    }

    public void partitionBatch(RecordBatch recordBatch) throws ExecutionException {
        executeMethodLogic(new PartitionBatchHandlingClass(recordBatch));
    }

    public void flushOutgoingBatches(boolean z, boolean z2) throws ExecutionException {
        executeMethodLogic(new FlushBatchesHandlingClass(z, z2));
    }

    public void initialize() {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            it.next().initialize();
        }
    }

    public void clear() {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    public PartitionOutgoingBatch getOutgoingBatches(int i) {
        Iterator<Partitioner> it = this.partitioners.iterator();
        while (it.hasNext()) {
            PartitionOutgoingBatch outgoingBatch = it.next().getOutgoingBatch(i);
            if (outgoingBatch != null) {
                return outgoingBatch;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<Partitioner> getPartitioners() {
        return this.partitioners;
    }

    @VisibleForTesting
    void executeMethodLogic(GeneralExecuteIface generalExecuteIface) throws ExecutionException {
        CountDownLatchInjection latch = injector.getLatch(this.context.getExecutionControls(), "partitioner-sender-latch");
        Throwable th = null;
        try {
            latch.initialize(1);
            AtomicInteger atomicInteger = new AtomicInteger();
            ArrayList arrayList = new ArrayList(this.partitioners.size());
            ExecutionException executionException = null;
            startWait();
            try {
                try {
                    this.partitioners.forEach(partitioner -> {
                        createAndExecute(generalExecuteIface, latch, atomicInteger, arrayList, partitioner);
                    });
                    injector.injectInterruptiblePause(this.context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
                    latch.countDown();
                    await(atomicInteger, arrayList);
                    stopWait();
                    processPartitionerTasks(arrayList, null);
                } catch (Throwable th2) {
                    await(atomicInteger, arrayList);
                    stopWait();
                    processPartitionerTasks(arrayList, executionException);
                    throw th2;
                }
            } catch (InterruptedException e) {
                logger.warn("fragment thread interrupted", e);
                await(atomicInteger, arrayList);
                stopWait();
                processPartitionerTasks(arrayList, null);
            } catch (RejectedExecutionException e2) {
                logger.warn("Failed to execute partitioner tasks. Execution service down?", e2);
                executionException = new ExecutionException(e2);
                await(atomicInteger, arrayList);
                stopWait();
                processPartitionerTasks(arrayList, executionException);
            }
            if (latch != null) {
                if (0 == 0) {
                    latch.close();
                    return;
                }
                try {
                    latch.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (latch != null) {
                if (0 != 0) {
                    try {
                        latch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    latch.close();
                }
            }
            throw th4;
        }
    }

    private void createAndExecute(GeneralExecuteIface generalExecuteIface, CountDownLatchInjection countDownLatchInjection, AtomicInteger atomicInteger, List<PartitionerTask> list, Partitioner partitioner) {
        PartitionerTask partitionerTask = new PartitionerTask(this, generalExecuteIface, partitioner, atomicInteger, countDownLatchInjection);
        this.executor.execute(partitionerTask);
        list.add(partitionerTask);
        atomicInteger.incrementAndGet();
    }

    private void await(AtomicInteger atomicInteger, List<PartitionerTask> list) {
        boolean z = false;
        while (atomicInteger.get() > 0) {
            if (this.context.getExecutorState().shouldContinue() || z) {
                LockSupport.park();
            } else {
                logger.warn("Cancelling fragment {} partitioner tasks...", this.context.getFragIdString());
                list.forEach(partitionerTask -> {
                    partitionerTask.cancel(true);
                });
                z = true;
            }
        }
    }

    private void startWait() {
        if (this.enableParallelTaskExecution) {
            this.stats.startWait();
        }
    }

    private void stopWait() {
        if (this.enableParallelTaskExecution) {
            this.stats.stopWait();
        }
    }

    private void processPartitionerTasks(List<PartitionerTask> list, ExecutionException executionException) throws ExecutionException {
        long waitNanos;
        long j = 0;
        for (PartitionerTask partitionerTask : list) {
            ExecutionException exception = partitionerTask.getException();
            if (exception != null) {
                if (executionException == null) {
                    executionException = exception;
                } else {
                    executionException.getCause().addSuppressed(exception.getCause());
                }
            }
            if (executionException == null) {
                OperatorStats stats = partitionerTask.getStats();
                if (this.enableParallelTaskExecution) {
                    long processingNanos = stats.getProcessingNanos();
                    waitNanos = processingNanos > j ? processingNanos : j;
                } else {
                    waitNanos = j + stats.getWaitNanos();
                }
                j = waitNanos;
                this.stats.mergeMetrics(stats);
            }
        }
        if (executionException != null) {
            throw executionException;
        }
        if (this.enableParallelTaskExecution) {
            this.stats.adjustWaitNanos(-j);
        } else {
            this.stats.adjustWaitNanos(j);
        }
    }
}
