package org.apache.drill.exec.work.foreman;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/foreman/FragmentsRunner.class */
public class FragmentsRunner {
    private static final Logger logger;
    private static final ControlsInjector injector;
    private final WorkManager.WorkerBee bee;
    private final UserClientConnection initiatingClient;
    private final DrillbitContext drillbitContext;
    private final Foreman foreman;
    private List<BitControl.PlanFragment> planFragments;
    private BitControl.PlanFragment rootPlanFragment;
    private FragmentRoot rootOperator;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/FragmentsRunner$FragmentSubmitFailures.class */
    public static class FragmentSubmitFailures {
        final List<SubmissionException> submissionExceptions;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/drill/exec/work/foreman/FragmentsRunner$FragmentSubmitFailures$SubmissionException.class */
        public static class SubmissionException {
            final CoordinationProtos.DrillbitEndpoint drillbitEndpoint;
            final RpcException rpcException;

            SubmissionException(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, RpcException rpcException) {
                this.drillbitEndpoint = drillbitEndpoint;
                this.rpcException = rpcException;
            }
        }

        private FragmentSubmitFailures() {
            this.submissionExceptions = new LinkedList();
        }

        void addFailure(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, RpcException rpcException) {
            this.submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/FragmentsRunner$FragmentSubmitListener.class */
    public class FragmentSubmitListener extends EndpointListener<GeneralRPCProtos.Ack, BitControl.InitializeFragments> {
        private final CountDownLatch latch;
        private final FragmentSubmitFailures fragmentSubmitFailures;

        public FragmentSubmitListener(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, BitControl.InitializeFragments initializeFragments, CountDownLatch countDownLatch, FragmentSubmitFailures fragmentSubmitFailures) {
            super(drillbitEndpoint, initializeFragments);
            Preconditions.checkState((countDownLatch == null) == (fragmentSubmitFailures == null));
            this.latch = countDownLatch;
            this.fragmentSubmitFailures = fragmentSubmitFailures;
        }

        public void success(GeneralRPCProtos.Ack ack, ByteBuf byteBuf) {
            if (this.latch != null) {
                this.latch.countDown();
            }
        }

        public void failed(RpcException rpcException) {
            if (this.latch != null) {
                this.fragmentSubmitFailures.addFailure(this.endpoint, rpcException);
                this.latch.countDown();
            } else {
                FragmentsRunner.logger.debug("Failure while sending fragment.  Stopping query.", rpcException);
                FragmentsRunner.this.foreman.addToEventQueue(UserBitShared.QueryResult.QueryState.FAILED, rpcException);
            }
        }

        public void interrupted(InterruptedException interruptedException) {
            FragmentsRunner.logger.error("Interrupted while waiting for the RPC outcome of fragment submission.", interruptedException);
            failed(new RpcException("Interrupted while waiting for the RPC outcome of fragment submission.", interruptedException));
        }
    }

    public FragmentsRunner(WorkManager.WorkerBee workerBee, UserClientConnection userClientConnection, DrillbitContext drillbitContext, Foreman foreman) {
        this.bee = workerBee;
        this.initiatingClient = userClientConnection;
        this.drillbitContext = drillbitContext;
        this.foreman = foreman;
    }

    public WorkManager.WorkerBee getBee() {
        return this.bee;
    }

    public void setFragmentsInfo(List<BitControl.PlanFragment> list, BitControl.PlanFragment planFragment, FragmentRoot fragmentRoot) {
        this.planFragments = list;
        this.rootPlanFragment = planFragment;
        this.rootOperator = fragmentRoot;
    }

    public void submit() throws ExecutionSetupException {
        if (!$assertionsDisabled && this.planFragments == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.rootPlanFragment == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.rootOperator == null) {
            throw new AssertionError();
        }
        UserBitShared.QueryId queryId = this.foreman.getQueryId();
        if (!$assertionsDisabled && queryId != this.rootPlanFragment.getHandle().getQueryId()) {
            throw new AssertionError();
        }
        QueryManager queryManager = this.foreman.getQueryManager();
        this.drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
        this.drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
        logger.debug("Submitting fragments to run.");
        setupRootFragment(this.rootPlanFragment, this.rootOperator);
        setupNonRootFragments(this.planFragments);
        logger.debug("Fragments running.");
    }

    private void setupRootFragment(BitControl.PlanFragment planFragment, FragmentRoot fragmentRoot) throws ExecutionSetupException {
        QueryManager queryManager = this.foreman.getQueryManager();
        FragmentContextImpl fragmentContextImpl = new FragmentContextImpl(this.drillbitContext, planFragment, this.foreman.getQueryContext(), this.initiatingClient, this.drillbitContext.getFunctionImplementationRegistry());
        FragmentStatusReporter fragmentStatusReporter = new FragmentStatusReporter(fragmentContextImpl);
        FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContextImpl, planFragment, fragmentStatusReporter, fragmentRoot);
        RootFragmentManager rootFragmentManager = new RootFragmentManager(planFragment, fragmentExecutor, fragmentStatusReporter);
        queryManager.addFragmentStatusTracker(planFragment, true);
        if (fragmentContextImpl.isBuffersDone()) {
            this.bee.addFragmentRunner(fragmentExecutor);
        } else {
            this.drillbitContext.getWorkBus().addFragmentManager(rootFragmentManager);
        }
    }

    private void setupNonRootFragments(Collection<BitControl.PlanFragment> collection) throws ExecutionSetupException {
        if (collection.isEmpty()) {
            return;
        }
        ArrayListMultimap create = ArrayListMultimap.create();
        ArrayListMultimap create2 = ArrayListMultimap.create();
        for (BitControl.PlanFragment planFragment : collection) {
            if (logger.isTraceEnabled()) {
                logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(), planFragment.getFragmentJson());
            }
            this.foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
            if (planFragment.getLeafFragment()) {
                create.put(planFragment.getAssignment(), planFragment);
            } else {
                create2.put(planFragment.getAssignment(), planFragment);
            }
        }
        scheduleIntermediateFragments(create2);
        injector.injectChecked(this.foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);
        for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : create.keySet()) {
            sendRemoteFragments(drillbitEndpoint, create.get(drillbitEndpoint), null, null);
        }
    }

    private void sendRemoteFragments(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, Collection<BitControl.PlanFragment> collection, CountDownLatch countDownLatch, FragmentSubmitFailures fragmentSubmitFailures) {
        Controller controller = this.drillbitContext.getController();
        BitControl.InitializeFragments.Builder newBuilder = BitControl.InitializeFragments.newBuilder();
        Iterator<BitControl.PlanFragment> it = collection.iterator();
        while (it.hasNext()) {
            newBuilder.addFragment(it.next());
        }
        BitControl.InitializeFragments build = newBuilder.build();
        logger.debug("Sending remote fragments to node: {}\nData: {}", drillbitEndpoint, build);
        controller.getTunnel(drillbitEndpoint).sendFragments(new FragmentSubmitListener(drillbitEndpoint, build, countDownLatch, fragmentSubmitFailures), build);
    }

    private void scheduleIntermediateFragments(Multimap<CoordinationProtos.DrillbitEndpoint, BitControl.PlanFragment> multimap) {
        int size = multimap.keySet().size();
        ExtendedLatch extendedLatch = new ExtendedLatch(size);
        FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
        for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : multimap.keySet()) {
            sendRemoteFragments(drillbitEndpoint, multimap.get(drillbitEndpoint), extendedLatch, fragmentSubmitFailures);
        }
        long j = this.drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT) * size;
        if (size > 0 && !extendedLatch.awaitUninterruptibly(j)) {
            throw UserException.connectionError().message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. Sent %d and only heard response back from %d nodes.", new Object[]{Long.valueOf(j), Integer.valueOf(size), Long.valueOf(size - extendedLatch.getCount())}).build(logger);
        }
        List<FragmentSubmitFailures.SubmissionException> list = fragmentSubmitFailures.submissionExceptions;
        if (list.size() > 0) {
            HashSet newHashSet = Sets.newHashSet();
            StringBuilder sb = new StringBuilder();
            boolean z = true;
            Iterator<FragmentSubmitFailures.SubmissionException> it = fragmentSubmitFailures.submissionExceptions.iterator();
            while (it.hasNext()) {
                CoordinationProtos.DrillbitEndpoint drillbitEndpoint2 = it.next().drillbitEndpoint;
                if (newHashSet.add(drillbitEndpoint2)) {
                    if (z) {
                        z = false;
                    } else {
                        sb.append(", ");
                    }
                    sb.append(drillbitEndpoint2.getAddress());
                }
            }
            throw UserException.connectionError(list.get(0).rpcException).message("Error setting up remote intermediate fragment execution", new Object[0]).addContext("Nodes with failures", sb.toString()).build(logger);
        }
    }

    static {
        $assertionsDisabled = !FragmentsRunner.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FragmentsRunner.class);
        injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class);
    }
}
