package org.apache.drill.exec.rpc.data;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/drill/exec/rpc/data/DataServerRequestHandler.class */
public class DataServerRequestHandler implements RequestHandler<DataServerConnection> {
    private static final Logger logger = LoggerFactory.getLogger(DataServerRequestHandler.class);
    private final WorkEventBus workBus;
    private final WorkManager.WorkerBee bee;

    public DataServerRequestHandler(WorkEventBus workEventBus, WorkManager.WorkerBee workerBee) {
        this.workBus = workEventBus;
        this.bee = workerBee;
    }

    public void handle(DataServerConnection dataServerConnection, int i, ByteBuf byteBuf, ByteBuf byteBuf2, ResponseSender responseSender) throws RpcException {
        switch (i) {
            case 3:
                handleRecordBatchRequest(byteBuf, byteBuf2, responseSender);
                return;
            case 5:
                handleRuntimeFilterRequest(byteBuf, byteBuf2, responseSender);
                return;
            default:
                throw new RpcException("Not yet supported.");
        }
    }

    private void handleRecordBatchRequest(ByteBuf byteBuf, ByteBuf byteBuf2, ResponseSender responseSender) throws RpcException {
        BitData.FragmentRecordBatch fragmentRecordBatch = (BitData.FragmentRecordBatch) RpcBus.get(byteBuf, BitData.FragmentRecordBatch.PARSER);
        AckSender ackSender = new AckSender(responseSender);
        ackSender.increment();
        try {
            try {
                IncomingDataBatch incomingDataBatch = new IncomingDataBatch(fragmentRecordBatch, (DrillBuf) byteBuf2, ackSender);
                int receivingMinorFragmentIdCount = fragmentRecordBatch.getReceivingMinorFragmentIdCount();
                int nextInt = ThreadLocalRandom.current().nextInt(receivingMinorFragmentIdCount);
                submit(incomingDataBatch, nextInt, receivingMinorFragmentIdCount);
                submit(incomingDataBatch, 0, nextInt);
                ackSender.sendOk();
            } catch (IOException | FragmentSetupException e) {
                logger.error("Failure while getting fragment manager. {}", QueryIdHelper.getQueryIdentifiers(fragmentRecordBatch.getQueryId(), fragmentRecordBatch.getReceivingMajorFragmentId(), fragmentRecordBatch.getReceivingMinorFragmentIdList()), e);
                ackSender.clear();
                responseSender.send(new Response(BitData.RpcType.ACK, Acks.FAIL, new ByteBuf[0]));
                ackSender.sendOk();
            }
        } catch (Throwable th) {
            ackSender.sendOk();
            throw th;
        }
    }

    private void handleRuntimeFilterRequest(ByteBuf byteBuf, ByteBuf byteBuf2, ResponseSender responseSender) throws RpcException {
        BitData.RuntimeFilterBDef runtimeFilterBDef = (BitData.RuntimeFilterBDef) RpcBus.get(byteBuf, BitData.RuntimeFilterBDef.PARSER);
        if (byteBuf2 == null) {
            return;
        }
        List bloomFilterSizeInBytesList = runtimeFilterBDef.getBloomFilterSizeInBytesList();
        int size = bloomFilterSizeInBytesList.size();
        DrillBuf drillBuf = (DrillBuf) byteBuf2;
        DrillBuf[] drillBufArr = new DrillBuf[size];
        int i = 0;
        for (int i2 = 0; i2 < size; i2++) {
            int intValue = ((Integer) bloomFilterSizeInBytesList.get(i2)).intValue();
            drillBufArr[i2] = drillBuf.slice(i, intValue);
            i += intValue;
        }
        RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, drillBufArr);
        AckSender ackSender = new AckSender(responseSender);
        ackSender.increment();
        try {
            try {
                this.bee.receiveRuntimeFilter(runtimeFilterWritable);
                ackSender.sendOk();
            } catch (Exception e) {
                logger.error("error to solve received runtime filter, {}", QueryIdHelper.getQueryId(runtimeFilterBDef.getQueryId()), e);
                ackSender.clear();
                responseSender.send(new Response(BitData.RpcType.ACK, Acks.FAIL, new ByteBuf[0]));
                ackSender.sendOk();
            }
        } catch (Throwable th) {
            ackSender.sendOk();
            throw th;
        }
    }

    private void submit(IncomingDataBatch incomingDataBatch, int i, int i2) throws FragmentSetupException, IOException {
        for (int i3 = i; i3 < i2; i3++) {
            FragmentManager fragmentManager = this.workBus.getFragmentManager(getHandle(incomingDataBatch.getHeader(), i3));
            if (fragmentManager != null && fragmentManager.handle(incomingDataBatch)) {
                this.bee.startFragmentPendingRemote(fragmentManager);
            }
        }
    }

    private static ExecProtos.FragmentHandle getHandle(BitData.FragmentRecordBatch fragmentRecordBatch, int i) {
        return ExecProtos.FragmentHandle.newBuilder().setQueryId(fragmentRecordBatch.getQueryId()).setMajorFragmentId(fragmentRecordBatch.getReceivingMajorFragmentId()).setMinorFragmentId(fragmentRecordBatch.getReceivingMinorFragmentId(i)).build();
    }
}
