package org.apache.drill.exec.work.batch;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserImplConstants;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlRpcConfig;
import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/batch/ControlMessageHandler.class */
public class ControlMessageHandler implements RequestHandler<ControlConnection> {
    private static final Logger logger = LoggerFactory.getLogger(ControlMessageHandler.class);
    private final WorkManager.WorkerBee bee;
    private final CustomHandlerRegistry handlerRegistry = new CustomHandlerRegistry();

    public ControlMessageHandler(WorkManager.WorkerBee workerBee) {
        this.bee = workerBee;
    }

    public void handle(ControlConnection controlConnection, int i, ByteBuf byteBuf, ByteBuf byteBuf2, ResponseSender responseSender) throws RpcException {
        switch (i) {
            case 3:
                BitControl.InitializeFragments initializeFragments = (BitControl.InitializeFragments) RpcBus.get(byteBuf, BitControl.InitializeFragments.PARSER);
                DrillbitContext context = this.bee.getContext();
                for (int i2 = 0; i2 < initializeFragments.getFragmentCount(); i2++) {
                    startNewFragment(initializeFragments.getFragment(i2), context);
                }
                responseSender.send(ControlRpcConfig.OK);
                return;
            case 4:
            case 5:
            case DrillParserImplConstants.ALL /* 9 */:
            case DrillParserImplConstants.ALLOW /* 11 */:
            case 12:
            case DrillParserImplConstants.ALTER /* 13 */:
            case 14:
            default:
                throw new RpcException("Not yet supported.");
            case 6:
                cancelFragment((ExecProtos.FragmentHandle) RpcBus.get(byteBuf, ExecProtos.FragmentHandle.PARSER));
                responseSender.send(ControlRpcConfig.OK);
                return;
            case DrillParserImplConstants.ADMIN /* 7 */:
                receivingFragmentFinished((BitControl.FinishedReceiver) RpcBus.get(byteBuf, BitControl.FinishedReceiver.PARSER));
                responseSender.send(ControlRpcConfig.OK);
                return;
            case 8:
                this.bee.getContext().getWorkBus().statusUpdate((BitControl.FragmentStatus) RpcBus.get(byteBuf, BitControl.FragmentStatus.PARSER));
                responseSender.send(ControlRpcConfig.OK);
                return;
            case DrillParserImplConstants.ALLOCATE /* 10 */:
                Foreman foremanForQueryId = this.bee.getForemanForQueryId((UserBitShared.QueryId) RpcBus.get(byteBuf, UserBitShared.QueryId.PARSER));
                if (foremanForQueryId == null) {
                    throw new RpcException("Query not running on node.");
                }
                responseSender.send(new Response(BitControl.RpcType.RESP_QUERY_STATUS, foremanForQueryId.getQueryManager().getQueryProfile(), new ByteBuf[0]));
                return;
            case 15:
                Foreman foremanForQueryId2 = this.bee.getForemanForQueryId((UserBitShared.QueryId) RpcBus.get(byteBuf, UserBitShared.QueryId.PARSER));
                if (foremanForQueryId2 == null) {
                    responseSender.send(ControlRpcConfig.FAIL);
                    return;
                } else {
                    foremanForQueryId2.cancel();
                    responseSender.send(ControlRpcConfig.OK);
                    return;
                }
            case 16:
                resumeFragment((ExecProtos.FragmentHandle) RpcBus.get(byteBuf, ExecProtos.FragmentHandle.PARSER));
                responseSender.send(ControlRpcConfig.OK);
                return;
            case 17:
                responseSender.send(this.handlerRegistry.handle((BitControl.CustomMessage) RpcBus.get(byteBuf, BitControl.CustomMessage.PARSER), (DrillBuf) byteBuf2));
                return;
        }
    }

    private void startNewFragment(BitControl.PlanFragment planFragment, DrillbitContext drillbitContext) throws UserRpcException {
        logger.debug("Received remote fragment start instruction", planFragment);
        try {
            FragmentContext fragmentContext = new FragmentContext(drillbitContext, planFragment, drillbitContext.getFunctionImplementationRegistry());
            FragmentStatusReporter fragmentStatusReporter = new FragmentStatusReporter(fragmentContext);
            FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, planFragment, fragmentStatusReporter);
            if (planFragment.getLeafFragment()) {
                this.bee.addFragmentRunner(fragmentExecutor);
            } else {
                drillbitContext.getWorkBus().addFragmentManager(new NonRootFragmentManager(planFragment, fragmentExecutor, fragmentStatusReporter));
            }
        } catch (ExecutionSetupException e) {
            throw new UserRpcException(drillbitContext.getEndpoint(), "Failed to create fragment context", e);
        } catch (Exception e2) {
            throw new UserRpcException(drillbitContext.getEndpoint(), "Failure while trying to start remote fragment", e2);
        } catch (OutOfMemoryError e3) {
            if (!e3.getMessage().startsWith("Direct buffer")) {
                throw e3;
            }
            throw new UserRpcException(drillbitContext.getEndpoint(), "Out of direct memory while trying to start remote fragment", e3);
        }
    }

    private GeneralRPCProtos.Ack cancelFragment(ExecProtos.FragmentHandle fragmentHandle) {
        if (this.bee.getContext().getWorkBus().removeFragmentManager(fragmentHandle, true)) {
            return Acks.OK;
        }
        FragmentExecutor fragmentRunner = this.bee.getFragmentRunner(fragmentHandle);
        if (fragmentRunner != null) {
            fragmentRunner.cancel();
            return Acks.OK;
        }
        logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getQueryIdentifier(fragmentHandle));
        return Acks.OK;
    }

    private GeneralRPCProtos.Ack resumeFragment(ExecProtos.FragmentHandle fragmentHandle) {
        FragmentManager fragmentManager = this.bee.getContext().getWorkBus().getFragmentManager(fragmentHandle);
        if (fragmentManager != null) {
            fragmentManager.unpause();
            return Acks.OK;
        }
        FragmentExecutor fragmentRunner = this.bee.getFragmentRunner(fragmentHandle);
        if (fragmentRunner != null) {
            fragmentRunner.unpause();
            return Acks.OK;
        }
        logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getQueryIdentifier(fragmentHandle));
        return Acks.OK;
    }

    private GeneralRPCProtos.Ack receivingFragmentFinished(BitControl.FinishedReceiver finishedReceiver) {
        FragmentManager fragmentManager = this.bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
        if (fragmentManager != null) {
            fragmentManager.receivingFragmentFinished(finishedReceiver.getReceiver());
        } else {
            FragmentExecutor fragmentRunner = this.bee.getFragmentRunner(finishedReceiver.getSender());
            if (fragmentRunner != null) {
                fragmentRunner.receivingFragmentFinished(finishedReceiver.getReceiver());
            } else {
                logger.warn("Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.", QueryIdHelper.getQueryIdentifier(finishedReceiver.getSender()), QueryIdHelper.getQueryIdentifier(finishedReceiver.getReceiver()));
            }
        }
        return Acks.OK;
    }

    public CustomHandlerRegistry getHandlerRegistry() {
        return this.handlerRegistry;
    }
}
