package org.apache.drill.exec.rpc.data;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/drill/exec/rpc/data/TestBitRpc.class */
public class TestBitRpc extends ExecTest {

    /* loaded from: input_file:org/apache/drill/exec/rpc/data/TestBitRpc$MockFragmentManager.class */
    public static class MockFragmentManager implements FragmentManager {
        private final BootStrapContext c;
        private int v;

        public MockFragmentManager(BootStrapContext bootStrapContext) {
            this.c = bootStrapContext;
        }

        public boolean handle(IncomingDataBatch incomingDataBatch) throws FragmentSetupException, IOException {
            try {
                this.v++;
                if (this.v % 10 == 0) {
                    Thread.sleep(3000L);
                }
            } catch (InterruptedException e) {
            }
            RawFragmentBatch newRawFragmentBatch = incomingDataBatch.newRawFragmentBatch(this.c.getAllocator());
            newRawFragmentBatch.sendOk();
            newRawFragmentBatch.release();
            return true;
        }

        public FragmentExecutor getRunnable() {
            return null;
        }

        public void cancel() {
        }

        public boolean isCancelled() {
            return false;
        }

        public void unpause() {
        }

        public boolean isWaiting() {
            return false;
        }

        public ExecProtos.FragmentHandle getHandle() {
            return null;
        }

        public FragmentContext getFragmentContext() {
            return null;
        }

        public void receivingFragmentFinished(ExecProtos.FragmentHandle fragmentHandle) {
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/rpc/data/TestBitRpc$MockFragmentManagerWithDynamicCredit.class */
    public static class MockFragmentManagerWithDynamicCredit implements FragmentManager {
        private final BootStrapContext c;
        private int v;
        private int times = 0;

        public MockFragmentManagerWithDynamicCredit(BootStrapContext bootStrapContext) {
            this.c = bootStrapContext;
        }

        public boolean handle(IncomingDataBatch incomingDataBatch) throws FragmentSetupException, IOException {
            try {
                this.v++;
                if (this.v % 10 == 0) {
                    Thread.sleep(3000L);
                }
            } catch (InterruptedException e) {
            }
            this.times++;
            RawFragmentBatch newRawFragmentBatch = incomingDataBatch.newRawFragmentBatch(this.c.getAllocator());
            if (this.times > 3) {
                newRawFragmentBatch.sendOk(4);
            } else {
                newRawFragmentBatch.sendOk();
            }
            newRawFragmentBatch.release();
            return true;
        }

        public FragmentExecutor getRunnable() {
            return null;
        }

        public void cancel() {
        }

        public boolean isCancelled() {
            return false;
        }

        public void unpause() {
        }

        public boolean isWaiting() {
            return false;
        }

        public ExecProtos.FragmentHandle getHandle() {
            return null;
        }

        public FragmentContext getFragmentContext() {
            return null;
        }

        public void receivingFragmentFinished(ExecProtos.FragmentHandle fragmentHandle) {
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/rpc/data/TestBitRpc$TimingOutcome.class */
    private class TimingOutcome implements RpcOutcomeListener<BitData.AckWithCredit> {
        private AtomicLong max;
        private Stopwatch watch = Stopwatch.createStarted();

        public TimingOutcome(AtomicLong atomicLong) {
            this.max = atomicLong;
        }

        public void failed(RpcException rpcException) {
            rpcException.printStackTrace();
        }

        public void success(BitData.AckWithCredit ackWithCredit, ByteBuf byteBuf) {
            long j;
            long elapsed = this.watch.elapsed(TimeUnit.MILLISECONDS);
            do {
                j = this.max.get();
                if (j >= elapsed) {
                    return;
                }
            } while (!this.max.compareAndSet(j, elapsed));
        }

        public void interrupted(InterruptedException interruptedException) {
        }
    }

    @Test
    public void testConnectionBackpressure() throws Exception {
        WorkManager.WorkerBee workerBee = (WorkManager.WorkerBee) Mockito.mock(WorkManager.WorkerBee.class);
        WorkEventBus workEventBus = (WorkEventBus) Mockito.mock(WorkEventBus.class);
        DrillConfig create = DrillConfig.create();
        BootStrapContext bootStrapContext = new BootStrapContext(create, SystemOptionManager.createDefaultOptionDefinitions(), ClassPathScanner.fromPrescan(create));
        Mockito.when(((FragmentContextImpl) Mockito.mock(FragmentContextImpl.class)).getAllocator()).thenReturn(bootStrapContext.getAllocator());
        Mockito.when(workEventBus.getFragmentManager((ExecProtos.FragmentHandle) Matchers.any(ExecProtos.FragmentHandle.class))).thenReturn(new MockFragmentManager(bootStrapContext));
        DataConnectionConfig dataConnectionConfig = new DataConnectionConfig(bootStrapContext.getAllocator(), bootStrapContext, new DataServerRequestHandler(workEventBus, workerBee));
        DataTunnel dataTunnel = new DataTunnel(new DataConnectionManager(CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(new DataServer(dataConnectionConfig).bind(1234, true)).build(), dataConnectionConfig));
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < 40; i++) {
            System.currentTimeMillis();
            dataTunnel.sendRecordBatch(new TimingOutcome(atomicLong), new FragmentWritableBatch(false, UserBitShared.QueryId.getDefaultInstance(), 1, 1, 1, 1, getRandomBatch(bootStrapContext.getAllocator(), 5000)));
        }
        Assert.assertTrue(atomicLong.get() > 2700);
        Thread.sleep(5000L);
    }

    @Test
    public void testConnectionBackpressureWithDynamicCredit() throws Exception {
        WorkManager.WorkerBee workerBee = (WorkManager.WorkerBee) Mockito.mock(WorkManager.WorkerBee.class);
        WorkEventBus workEventBus = (WorkEventBus) Mockito.mock(WorkEventBus.class);
        DrillConfig create = DrillConfig.create();
        BootStrapContext bootStrapContext = new BootStrapContext(create, SystemOptionManager.createDefaultOptionDefinitions(), ClassPathScanner.fromPrescan(create));
        Mockito.when(((FragmentContextImpl) Mockito.mock(FragmentContextImpl.class)).getAllocator()).thenReturn(bootStrapContext.getAllocator());
        Mockito.when(workEventBus.getFragmentManager((ExecProtos.FragmentHandle) Matchers.any(ExecProtos.FragmentHandle.class))).thenReturn(new MockFragmentManagerWithDynamicCredit(bootStrapContext));
        DataConnectionConfig dataConnectionConfig = new DataConnectionConfig(bootStrapContext.getAllocator(), bootStrapContext, new DataServerRequestHandler(workEventBus, workerBee));
        DataTunnel dataTunnel = new DataTunnel(new DataConnectionManager(CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(new DataServer(dataConnectionConfig).bind(1234, true)).build(), dataConnectionConfig));
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < 40; i++) {
            System.currentTimeMillis();
            dataTunnel.sendRecordBatch(new TimingOutcome(atomicLong), new FragmentWritableBatch(false, UserBitShared.QueryId.getDefaultInstance(), 1, 1, 1, 1, getRandomBatch(bootStrapContext.getAllocator(), 5000)));
        }
        Assert.assertTrue(atomicLong.get() > 2700);
        Thread.sleep(5000L);
    }

    private static WritableBatch getRandomBatch(BufferAllocator bufferAllocator, int i) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            Float8Vector newVector = TypeHelper.getNewVector(MaterializedField.create("a", Types.required(TypeProtos.MinorType.FLOAT8)), bufferAllocator);
            newVector.allocateNew(i);
            newVector.getMutator().generateTestData(i);
            newArrayList.add(newVector);
        }
        return WritableBatch.getBatchNoHV(i, newArrayList, false);
    }
}
