package org.apache.drill.exec.work.fragment;

import java.io.IOException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.common.DeferredException;
import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserImplConstants;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.FailureUtils;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor.class */
public class FragmentExecutor implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FragmentExecutor.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
    private final String fragmentName;
    private final ExecutorFragmentContext fragmentContext;
    private final FragmentStatusReporter statusReporter;
    private final DeferredException deferredException;
    private final BitControl.PlanFragment fragment;
    private final FragmentRoot rootOperator;
    private volatile RootExec root;
    private final AtomicReference<UserBitShared.FragmentState> fragmentState;
    private final Queue<ExecProtos.FragmentHandle> receiverFinishedQueue;
    private final FragmentEventProcessor eventProcessor;
    private final AtomicReference<Thread> myThreadRef;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.drill.exec.work.fragment.FragmentExecutor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState;

        static {
            try {
                $SwitchMap$org$apache$drill$exec$work$fragment$FragmentExecutor$EventType[EventType.CANCEL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$work$fragment$FragmentExecutor$EventType[EventType.CANCEL_AND_FINISH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$work$fragment$FragmentExecutor$EventType[EventType.RECEIVER_FINISHED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState = new int[UserBitShared.FragmentState.values().length];
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.SENDING.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.AWAITING_ALLOCATION.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.CANCELLATION_REQUESTED.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.FINISHED.ordinal()] = 5;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[UserBitShared.FragmentState.CANCELLED.ordinal()] = 7;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$EventType.class */
    public enum EventType {
        CANCEL,
        CANCEL_AND_FINISH,
        RECEIVER_FINISHED
    }

    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$ExecutorStateImpl.class */
    private class ExecutorStateImpl implements FragmentContext.ExecutorState {
        private ExecutorStateImpl() {
        }

        @Override // org.apache.drill.exec.ops.FragmentContext.ExecutorState
        public boolean shouldContinue() {
            return FragmentExecutor.this.shouldContinue();
        }

        @Override // org.apache.drill.exec.ops.FragmentContext.ExecutorState
        public void fail(Throwable th) {
            FragmentExecutor.this.fail(th);
        }

        @Override // org.apache.drill.exec.ops.FragmentContext.ExecutorState
        public boolean isFailed() {
            return FragmentExecutor.this.fragmentState.get() == UserBitShared.FragmentState.FAILED;
        }

        @Override // org.apache.drill.exec.ops.FragmentContext.ExecutorState
        public Throwable getFailureCause() {
            return FragmentExecutor.this.deferredException.getException();
        }

        @Override // org.apache.drill.exec.ops.FragmentContext.ExecutorState
        public void checkContinue() {
            if (!shouldContinue()) {
                throw new QueryCancelledException();
            }
        }

        /* synthetic */ ExecutorStateImpl(FragmentExecutor fragmentExecutor, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$FragmentDrillbitStatusListener.class */
    private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
        private FragmentDrillbitStatusListener() {
        }

        @Override // org.apache.drill.exec.work.foreman.DrillbitStatusListener
        public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
        }

        @Override // org.apache.drill.exec.work.foreman.DrillbitStatusListener
        public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
            CoordinationProtos.DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
            if (set.contains(foremanEndpoint)) {
                FragmentExecutor.logger.warn("Foreman {} no longer active.  Cancelling fragment {}.", foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(FragmentExecutor.this.fragmentContext.getHandle()));
                FragmentExecutor.this.statusReporter.close();
                FragmentExecutor.this.cancel();
            }
        }

        /* synthetic */ FragmentDrillbitStatusListener(FragmentExecutor fragmentExecutor, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$FragmentEvent.class */
    public class FragmentEvent {
        private final EventType type;
        private final ExecProtos.FragmentHandle handle;

        FragmentEvent(EventType eventType, ExecProtos.FragmentHandle fragmentHandle) {
            this.type = eventType;
            this.handle = fragmentHandle;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/fragment/FragmentExecutor$FragmentEventProcessor.class */
    public class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
        private final AtomicBoolean terminate;
        static final /* synthetic */ boolean $assertionsDisabled;

        private FragmentEventProcessor() {
            this.terminate = new AtomicBoolean(false);
        }

        void cancel() {
            sendEvent(new FragmentEvent(EventType.CANCEL, null));
        }

        void cancelAndFinish() {
            sendEvent(new FragmentEvent(EventType.CANCEL_AND_FINISH, null));
        }

        void receiverFinished(ExecProtos.FragmentHandle fragmentHandle) {
            sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, fragmentHandle));
        }

        public void terminate() {
            this.terminate.set(true);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processEvent(FragmentEvent fragmentEvent) {
            if (fragmentEvent.type.equals(EventType.RECEIVER_FINISHED)) {
                if (this.terminate.get()) {
                    return;
                }
            } else if (!this.terminate.compareAndSet(false, true)) {
                return;
            }
            switch (fragmentEvent.type) {
                case CANCEL:
                    FragmentExecutor.this.updateState(UserBitShared.FragmentState.CANCELLATION_REQUESTED);
                    killThread();
                    return;
                case CANCEL_AND_FINISH:
                    FragmentExecutor.this.updateState(UserBitShared.FragmentState.CANCELLATION_REQUESTED);
                    FragmentExecutor.this.cleanup(UserBitShared.FragmentState.FINISHED);
                    return;
                case RECEIVER_FINISHED:
                    if (!$assertionsDisabled && fragmentEvent.handle == null) {
                        throw new AssertionError("RECEIVER_FINISHED event must have a handle");
                    }
                    if (FragmentExecutor.this.root == null) {
                        FragmentExecutor.logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.", QueryIdHelper.getFragmentId(FragmentExecutor.this.getContext().getHandle()), QueryIdHelper.getFragmentId(fragmentEvent.handle));
                        return;
                    } else {
                        FragmentExecutor.logger.info("Applying request for early sender termination for {} -> {}.", QueryIdHelper.getQueryIdentifier(FragmentExecutor.this.getContext().getHandle()), QueryIdHelper.getFragmentId(fragmentEvent.handle));
                        FragmentExecutor.this.receiverFinishedQueue.add(fragmentEvent.handle);
                        return;
                    }
                default:
                    return;
            }
        }

        private void killThread() {
            Thread thread = (Thread) FragmentExecutor.this.myThreadRef.get();
            FragmentExecutor.logger.debug("Interrupting fragment thread {}", thread.getName());
            thread.interrupt();
        }

        /* synthetic */ FragmentEventProcessor(FragmentExecutor fragmentExecutor, AnonymousClass1 anonymousClass1) {
            this();
        }

        static {
            $assertionsDisabled = !FragmentExecutor.class.desiredAssertionStatus();
        }
    }

    public FragmentExecutor(ExecutorFragmentContext executorFragmentContext, BitControl.PlanFragment planFragment, FragmentStatusReporter fragmentStatusReporter) {
        this(executorFragmentContext, planFragment, fragmentStatusReporter, null);
    }

    public FragmentExecutor(ExecutorFragmentContext executorFragmentContext, BitControl.PlanFragment planFragment, FragmentStatusReporter fragmentStatusReporter, FragmentRoot fragmentRoot) {
        this.deferredException = new DeferredException();
        this.fragmentState = new AtomicReference<>(UserBitShared.FragmentState.AWAITING_ALLOCATION);
        this.receiverFinishedQueue = new ConcurrentLinkedQueue();
        this.eventProcessor = new FragmentEventProcessor(this, null);
        this.myThreadRef = new AtomicReference<>(null);
        this.fragmentContext = executorFragmentContext;
        this.statusReporter = fragmentStatusReporter;
        this.fragment = planFragment;
        this.rootOperator = fragmentRoot;
        this.fragmentName = QueryIdHelper.getQueryIdentifier(executorFragmentContext.getHandle());
        executorFragmentContext.setExecutorState(new ExecutorStateImpl(this, null));
    }

    public String toString() {
        return "FragmentExecutor [fragmentContext=" + this.fragmentContext + ", fragmentState=" + this.fragmentState + "]";
    }

    public BitControl.FragmentStatus getStatus() {
        if (this.fragmentState.get() == UserBitShared.FragmentState.RUNNING) {
            return this.statusReporter.getStatus(UserBitShared.FragmentState.RUNNING);
        }
        return null;
    }

    public void cancel() {
        if (!this.myThreadRef.compareAndSet(null, Thread.currentThread())) {
            this.eventProcessor.cancel();
        } else {
            this.eventProcessor.cancelAndFinish();
            this.eventProcessor.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanup(UserBitShared.FragmentState fragmentState) {
        closeOutResources();
        updateState(fragmentState);
        sendFinalState();
    }

    public synchronized void unpause() {
        this.fragmentContext.getExecutionControls().unpauseAll();
    }

    public void receivingFragmentFinished(ExecProtos.FragmentHandle fragmentHandle) {
        this.eventProcessor.receiverFinished(fragmentHandle);
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        if (this.myThreadRef.compareAndSet(null, currentThread)) {
            String name = currentThread.getName();
            ExecProtos.FragmentHandle handle = this.fragmentContext.getHandle();
            ClusterCoordinator clusterCoordinator = this.fragmentContext.getClusterCoordinator();
            FragmentDrillbitStatusListener fragmentDrillbitStatusListener = new FragmentDrillbitStatusListener(this, null);
            try {
                try {
                    try {
                        try {
                            currentThread.setName(QueryIdHelper.getExecutorThreadName(handle));
                            this.root = ImplCreator.getExec(this.fragmentContext, this.rootOperator != null ? this.rootOperator : this.fragmentContext.getPlanReader().readFragmentRoot(this.fragment.getFragmentJson()));
                            if (this.root == null) {
                                this.eventProcessor.terminate();
                                Thread.interrupted();
                                cleanup(UserBitShared.FragmentState.FINISHED);
                                clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                                currentThread.setName(name);
                                return;
                            }
                            clusterCoordinator.addDrillbitStatusListener(fragmentDrillbitStatusListener);
                            updateState(UserBitShared.FragmentState.RUNNING);
                            this.eventProcessor.start();
                            injector.injectPause(this.fragmentContext.getExecutionControls(), "fragment-running", logger);
                            CoordinationProtos.DrillbitEndpoint endpoint = this.fragmentContext.getEndpoint();
                            logger.debug("Starting fragment {}:{} on {}:{}", new Object[]{Integer.valueOf(handle.getMajorFragmentId()), Integer.valueOf(handle.getMinorFragmentId()), endpoint.getAddress(), Integer.valueOf(endpoint.getUserPort())});
                            (this.fragmentContext.isImpersonationEnabled() ? ImpersonationUtil.createProxyUgi(this.fragmentContext.getQueryUserName()) : ImpersonationUtil.getProcessUserUGI()).doAs(() -> {
                                injector.injectChecked(this.fragmentContext.getExecutionControls(), "fragment-execution", IOException.class);
                                while (shouldContinue()) {
                                    while (true) {
                                        ExecProtos.FragmentHandle poll = this.receiverFinishedQueue.poll();
                                        if (poll == null) {
                                            break;
                                        }
                                        this.root.receivingFragmentFinished(poll);
                                    }
                                    if (!this.root.next()) {
                                        return null;
                                    }
                                }
                                return null;
                            });
                            this.eventProcessor.terminate();
                            Thread.interrupted();
                            cleanup(UserBitShared.FragmentState.FINISHED);
                            clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                            currentThread.setName(name);
                        } catch (OutOfMemoryError | OutOfMemoryException e) {
                            if (FailureUtils.isDirectMemoryOOM(e)) {
                                this.root.dumpBatches(e);
                                fail(UserException.memoryError(e).build(logger));
                            } else {
                                FailureUtils.unrecoverableFailure(e, "Unable to handle out of memory condition in FragmentExecutor.", -1);
                            }
                            this.eventProcessor.terminate();
                            Thread.interrupted();
                            cleanup(UserBitShared.FragmentState.FINISHED);
                            clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                            currentThread.setName(name);
                        }
                    } catch (Throwable th) {
                        if (this.root != null) {
                            this.root.dumpBatches(th);
                        }
                        fail(th);
                        this.eventProcessor.terminate();
                        Thread.interrupted();
                        cleanup(UserBitShared.FragmentState.FINISHED);
                        clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                        currentThread.setName(name);
                    }
                } catch (InterruptedException e2) {
                    logger.trace("Interrupted root: {}", this.root, e2);
                    this.eventProcessor.terminate();
                    Thread.interrupted();
                    cleanup(UserBitShared.FragmentState.FINISHED);
                    clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                    currentThread.setName(name);
                } catch (QueryCancelledException e3) {
                    this.eventProcessor.terminate();
                    Thread.interrupted();
                    cleanup(UserBitShared.FragmentState.FINISHED);
                    clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                    currentThread.setName(name);
                }
            } catch (Throwable th2) {
                this.eventProcessor.terminate();
                Thread.interrupted();
                cleanup(UserBitShared.FragmentState.FINISHED);
                clusterCoordinator.removeDrillbitStatusListener(fragmentDrillbitStatusListener);
                currentThread.setName(name);
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldContinue() {
        return (isCompleted() || UserBitShared.FragmentState.CANCELLATION_REQUESTED == this.fragmentState.get()) ? false : true;
    }

    private boolean isCompleted() {
        return isTerminal(this.fragmentState.get());
    }

    private void sendFinalState() {
        UserBitShared.FragmentState fragmentState = this.fragmentState.get();
        if (fragmentState == UserBitShared.FragmentState.FAILED) {
            ExecProtos.FragmentHandle handle = getContext().getHandle();
            this.statusReporter.fail(UserException.systemError(this.deferredException.getAndClear()).addIdentity(getContext().getEndpoint()).addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId()).build(logger));
        } else {
            this.statusReporter.stateChanged(fragmentState);
        }
        this.statusReporter.close();
    }

    private void closeOutResources() {
        try {
            if (this.root != null) {
                this.root.close();
            }
        } catch (Exception e) {
            fail(e);
        }
        this.fragmentContext.close();
    }

    private void warnStateChange(UserBitShared.FragmentState fragmentState, UserBitShared.FragmentState fragmentState2) {
        logger.warn(this.fragmentName + ": Ignoring unexpected state transition {} --> {}", fragmentState.name(), fragmentState2.name());
    }

    private void errorStateChange(UserBitShared.FragmentState fragmentState, UserBitShared.FragmentState fragmentState2) {
        throw new StateTransitionException(String.format("%s: Invalid state transition %s --> %s", this.fragmentName, fragmentState.name(), fragmentState2.name()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x0033. Please report as an issue. */
    public synchronized boolean updateState(UserBitShared.FragmentState fragmentState) {
        UserBitShared.FragmentState fragmentState2 = this.fragmentState.get();
        logger.info(this.fragmentName + ": State change requested {} --> {}", fragmentState2, fragmentState);
        switch (AnonymousClass1.$SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[fragmentState.ordinal()]) {
            case 3:
                if (fragmentState2 == UserBitShared.FragmentState.AWAITING_ALLOCATION) {
                    this.fragmentState.set(fragmentState);
                    this.statusReporter.stateChanged(fragmentState);
                    return true;
                }
                errorStateChange(fragmentState2, fragmentState);
            case 1:
            case 2:
            case 7:
            default:
                errorStateChange(fragmentState2, fragmentState);
                throw new IllegalStateException();
            case 4:
                switch (AnonymousClass1.$SwitchMap$org$apache$drill$exec$proto$UserBitShared$FragmentState[fragmentState2.ordinal()]) {
                    case 1:
                    case 2:
                    case 3:
                        this.fragmentState.set(fragmentState);
                        this.statusReporter.stateChanged(fragmentState);
                        return true;
                    default:
                        warnStateChange(fragmentState2, fragmentState);
                        return false;
                }
            case 5:
                if (fragmentState2 == UserBitShared.FragmentState.CANCELLATION_REQUESTED) {
                    fragmentState = UserBitShared.FragmentState.CANCELLED;
                } else if (fragmentState2 == UserBitShared.FragmentState.FAILED) {
                    fragmentState = UserBitShared.FragmentState.FAILED;
                }
            case DrillParserImplConstants.SCH_NUM /* 6 */:
                if (!isTerminal(fragmentState2)) {
                    this.fragmentState.set(fragmentState);
                    return true;
                }
                if (fragmentState2 == UserBitShared.FragmentState.FAILED) {
                    return false;
                }
                if (fragmentState2 == UserBitShared.FragmentState.CANCELLED && fragmentState == UserBitShared.FragmentState.FAILED) {
                    this.fragmentState.set(UserBitShared.FragmentState.FAILED);
                    return true;
                }
                warnStateChange(fragmentState2, fragmentState);
                return false;
        }
    }

    private boolean isTerminal(UserBitShared.FragmentState fragmentState) {
        return fragmentState == UserBitShared.FragmentState.CANCELLED || fragmentState == UserBitShared.FragmentState.FAILED || fragmentState == UserBitShared.FragmentState.FINISHED;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(Throwable th) {
        this.deferredException.addThrowable(th);
        updateState(UserBitShared.FragmentState.FAILED);
    }

    public ExecutorFragmentContext getContext() {
        return this.fragmentContext;
    }
}
