package org.apache.drill.exec.rpc.control;

import io.netty.buffer.ByteBuf;
import java.util.concurrent.CountDownLatch;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.test.BaseTest;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/drill/exec/rpc/control/TestLocalControlConnectionManager.class */
public class TestLocalControlConnectionManager extends BaseTest {
    private static ControlConnectionConfig mockConfig;
    private static ControlMessageHandler mockHandler;
    private static ControlTunnel controlTunnel;
    private static CountDownLatch latch;
    private static final String NEGATIVE_ACK_MESSAGE = "Negative Ack received";

    @Rule
    public ExpectedException exceptionThrown = ExpectedException.none();
    private static final CoordinationProtos.DrillbitEndpoint localEndpoint = CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress("10.0.0.1").setControlPort(31011).setState(CoordinationProtos.DrillbitEndpoint.State.STARTUP).build();
    private static final RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener = new RpcOutcomeListener<GeneralRPCProtos.Ack>() { // from class: org.apache.drill.exec.rpc.control.TestLocalControlConnectionManager.1
        public void failed(RpcException rpcException) {
            throw new IllegalStateException((Throwable) rpcException);
        }

        public void success(GeneralRPCProtos.Ack ack, ByteBuf byteBuf) {
            if (!ack.getOk()) {
                throw new IllegalStateException(TestLocalControlConnectionManager.NEGATIVE_ACK_MESSAGE);
            }
            TestLocalControlConnectionManager.latch.countDown();
        }

        public void interrupted(InterruptedException interruptedException) {
        }
    };

    @BeforeClass
    public static void setup() {
        mockConfig = (ControlConnectionConfig) Mockito.mock(ControlConnectionConfig.class);
        ConnectionManagerRegistry connectionManagerRegistry = new ConnectionManagerRegistry(mockConfig);
        connectionManagerRegistry.setLocalEndpoint(localEndpoint);
        ControlConnectionManager connectionManager = connectionManagerRegistry.getConnectionManager(localEndpoint);
        Assert.assertTrue(connectionManager instanceof LocalControlConnectionManager);
        controlTunnel = new ControlTunnel(connectionManager);
    }

    @Before
    public void setupForTest() {
        mockHandler = (ControlMessageHandler) Mockito.mock(ControlMessageHandler.class);
        Mockito.when(mockConfig.getMessageHandler()).thenReturn(mockHandler);
    }

    @Test
    public void testLocalSendFragmentStatus_Success() throws Exception {
        UserBitShared.QueryId defaultInstance = UserBitShared.QueryId.getDefaultInstance();
        UserBitShared.QueryProfile defaultInstance2 = UserBitShared.QueryProfile.getDefaultInstance();
        Mockito.when(mockHandler.requestQueryStatus(defaultInstance)).thenReturn(defaultInstance2);
        Assert.assertEquals((UserBitShared.QueryProfile) controlTunnel.requestQueryProfile(defaultInstance).checkedGet(), defaultInstance2);
    }

    @Test
    public void testLocalSendFragmentStatus_Failure() throws Exception {
        UserBitShared.QueryId defaultInstance = UserBitShared.QueryId.getDefaultInstance();
        this.exceptionThrown.expect(RpcException.class);
        this.exceptionThrown.expectMessage("Testing failure case");
        Mockito.when(mockHandler.requestQueryStatus(defaultInstance)).thenThrow(new Throwable[]{new RpcException("Testing failure case")});
        controlTunnel.requestQueryProfile(defaultInstance).checkedGet();
    }

    @Test
    public void testLocalCancelFragment_PositiveAck() throws Exception {
        ExecProtos.FragmentHandle defaultInstance = ExecProtos.FragmentHandle.getDefaultInstance();
        latch = new CountDownLatch(1);
        Mockito.when(mockHandler.cancelFragment(defaultInstance)).thenReturn(Acks.OK);
        controlTunnel.cancelFragment(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testLocalCancelFragment_NegativeAck() throws Exception {
        ExecProtos.FragmentHandle defaultInstance = ExecProtos.FragmentHandle.getDefaultInstance();
        latch = new CountDownLatch(1);
        this.exceptionThrown.expect(IllegalStateException.class);
        this.exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
        Mockito.when(mockHandler.cancelFragment(defaultInstance)).thenReturn(Acks.FAIL);
        controlTunnel.cancelFragment(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testLocalSendFragments_PositiveAck() throws Exception {
        BitControl.InitializeFragments defaultInstance = BitControl.InitializeFragments.getDefaultInstance();
        latch = new CountDownLatch(1);
        Mockito.when(mockHandler.initializeFragment(defaultInstance)).thenReturn(Acks.OK);
        controlTunnel.sendFragments(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testLocalSendFragments_NegativeAck() throws Exception {
        BitControl.InitializeFragments defaultInstance = BitControl.InitializeFragments.getDefaultInstance();
        latch = new CountDownLatch(1);
        this.exceptionThrown.expect(IllegalStateException.class);
        this.exceptionThrown.expectMessage(NEGATIVE_ACK_MESSAGE);
        Mockito.when(mockHandler.initializeFragment(defaultInstance)).thenReturn(Acks.FAIL);
        controlTunnel.sendFragments(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testLocalSendFragments_Failure() throws Exception {
        BitControl.InitializeFragments defaultInstance = BitControl.InitializeFragments.getDefaultInstance();
        latch = new CountDownLatch(1);
        this.exceptionThrown.expect(IllegalStateException.class);
        this.exceptionThrown.expectCause(new TypeSafeMatcher<Throwable>(RpcException.class) { // from class: org.apache.drill.exec.rpc.control.TestLocalControlConnectionManager.2
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(Throwable th) {
                return th != null && (th instanceof RpcException);
            }

            public void describeTo(Description description) {
            }
        });
        Mockito.when(mockHandler.initializeFragment(defaultInstance)).thenThrow(new Throwable[]{new RpcException("Failed to initialize")});
        controlTunnel.sendFragments(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testUnpauseFragments() throws Exception {
        ExecProtos.FragmentHandle defaultInstance = ExecProtos.FragmentHandle.getDefaultInstance();
        latch = new CountDownLatch(1);
        Mockito.when(mockHandler.resumeFragment(defaultInstance)).thenReturn(Acks.OK);
        controlTunnel.unpauseFragment(outcomeListener, defaultInstance);
        latch.await();
    }

    @Test
    public void testRequestQueryStatus() throws Exception {
        UserBitShared.QueryId defaultInstance = UserBitShared.QueryId.getDefaultInstance();
        UserBitShared.QueryProfile defaultInstance2 = UserBitShared.QueryProfile.getDefaultInstance();
        Mockito.when(mockHandler.requestQueryStatus(defaultInstance)).thenReturn(defaultInstance2);
        Assert.assertEquals((UserBitShared.QueryProfile) controlTunnel.requestQueryProfile(defaultInstance).checkedGet(), defaultInstance2);
    }

    @Test
    public void testCancelQuery_PositiveAck() throws Exception {
        UserBitShared.QueryId defaultInstance = UserBitShared.QueryId.getDefaultInstance();
        GeneralRPCProtos.Ack ack = Acks.OK;
        Mockito.when(mockHandler.requestQueryCancel(defaultInstance)).thenReturn(ack);
        Assert.assertEquals((GeneralRPCProtos.Ack) controlTunnel.requestCancelQuery(defaultInstance).checkedGet(), ack);
    }

    @Test
    public void testCancelQuery_NegativeAck() throws Exception {
        UserBitShared.QueryId defaultInstance = UserBitShared.QueryId.getDefaultInstance();
        GeneralRPCProtos.Ack ack = Acks.FAIL;
        Mockito.when(mockHandler.requestQueryCancel(defaultInstance)).thenReturn(ack);
        Assert.assertEquals((GeneralRPCProtos.Ack) controlTunnel.requestCancelQuery(defaultInstance).checkedGet(), ack);
    }

    @Test
    public void testInformReceiverFinished_success() throws Exception {
        BitControl.FinishedReceiver defaultInstance = BitControl.FinishedReceiver.getDefaultInstance();
        latch = new CountDownLatch(1);
        Mockito.when(mockHandler.receivingFragmentFinished(defaultInstance)).thenReturn(Acks.OK);
        controlTunnel.informReceiverFinished(outcomeListener, defaultInstance);
        latch.await();
    }
}
