package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.class */
public class TestConnectionManager {
    private Configuration conf;
    private ConnectionManager connManager;
    private static final String[] TEST_GROUP = {"TEST_GROUP"};
    private static final UserGroupInformation TEST_USER1 = UserGroupInformation.createUserForTesting("user1", TEST_GROUP);
    private static final UserGroupInformation TEST_USER2 = UserGroupInformation.createUserForTesting("user2", TEST_GROUP);
    private static final UserGroupInformation TEST_USER3 = UserGroupInformation.createUserForTesting("user3", TEST_GROUP);
    private static final String TEST_NN_ADDRESS = "nn1:8080";
    private static final String UNRESOLVED_TEST_NN_ADDRESS = "unknownhost:8080";

    @Rule
    public ExpectedException exceptionRule = ExpectedException.none();

    @Before
    public void setup() throws Exception {
        this.conf = new Configuration();
        this.connManager = new ConnectionManager(this.conf);
        NetUtils.addStaticResolution("nn1", "localhost");
        NetUtils.createSocketAddrForHost("nn1", 8080);
        this.connManager.start();
    }

    @After
    public void shutdown() {
        if (this.connManager != null) {
            this.connManager.close();
        }
    }

    @Test
    public void testCleanup() throws Exception {
        Map pools = this.connManager.getPools();
        ConnectionPool connectionPool = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        addConnectionsToPool(connectionPool, 9, 4);
        pools.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), connectionPool);
        ConnectionPool connectionPool2 = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        addConnectionsToPool(connectionPool2, 10, 10);
        pools.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), connectionPool2);
        checkPoolConnections(TEST_USER1, 9, 4);
        checkPoolConnections(TEST_USER2, 10, 10);
        this.connManager.cleanup(connectionPool);
        checkPoolConnections(TEST_USER1, 8, 4);
        checkPoolConnections(TEST_USER2, 10, 10);
        this.connManager.cleanup(connectionPool);
        checkPoolConnections(TEST_USER1, 8, 4);
        checkPoolConnections(TEST_USER2, 10, 10);
        ConnectionPool connectionPool3 = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        addConnectionsToPool(connectionPool3, 8, 0);
        pools.put(new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), connectionPool3);
        checkPoolConnections(TEST_USER3, 10, 0);
        for (int i = 0; i < 10; i++) {
            this.connManager.cleanup(connectionPool3);
        }
        checkPoolConnections(TEST_USER3, 2, 0);
        addConnectionsToPool(connectionPool3, 8, 2);
        checkPoolConnections(TEST_USER3, 10, 2);
        for (int i2 = 0; i2 < 10; i2++) {
            this.connManager.cleanup(connectionPool3);
        }
        checkPoolConnections(TEST_USER3, 4, 2);
    }

    @Test
    public void testGetConnectionWithConcurrency() throws Exception {
        Map pools = this.connManager.getPools();
        Configuration configuration = new Configuration(this.conf);
        configuration.setInt("dfs.federation.router.max.concurrency.per.connection", 20);
        ConnectionPool connectionPool = new ConnectionPool(configuration, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        pools.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), connectionPool);
        Assert.assertEquals(1L, connectionPool.getNumConnections());
        for (int i = 0; i < 20; i++) {
            ConnectionContext connection = connectionPool.getConnection();
            Assert.assertTrue(connection.isUsable());
            connection.getClient();
        }
        Assert.assertEquals(1L, connectionPool.getNumConnections());
        ConnectionContext connection2 = connectionPool.getConnection();
        Assert.assertTrue(connection2.isActive());
        Assert.assertFalse(connection2.isUsable());
        connectionPool.addConnection(connectionPool.newConnection());
        ConnectionContext connection3 = connectionPool.getConnection();
        Assert.assertTrue(connection3.isUsable());
        connection3.getClient();
        Assert.assertEquals(2L, connectionPool.getNumConnections());
        checkPoolConnections(TEST_USER1, 2, 2);
    }

    @Test
    public void testConnectionCreatorWithException() throws Exception {
        ConnectionPool connectionPool = new ConnectionPool(this.conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        arrayBlockingQueue.add(connectionPool);
        ConnectionManager.ConnectionCreator connectionCreator = new ConnectionManager.ConnectionCreator(arrayBlockingQueue);
        connectionCreator.setDaemon(true);
        connectionCreator.start();
        Objects.requireNonNull(arrayBlockingQueue);
        GenericTestUtils.waitFor(arrayBlockingQueue::isEmpty, 50L, 5000L);
        Assert.assertTrue(arrayBlockingQueue.isEmpty());
        Assert.assertTrue(connectionCreator.isAlive());
        connectionCreator.interrupt();
    }

    @Test
    public void testGetConnectionWithException() throws Exception {
        this.exceptionRule.expect(IllegalArgumentException.class);
        this.exceptionRule.expectMessage("java.net.UnknownHostException: unknownhost");
        new ConnectionPool(this.conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
    }

    @Test
    public void testGetConnection() throws Exception {
        Map pools = this.connManager.getPools();
        int i = 5;
        ConnectionPool connectionPool = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        addConnectionsToPool(connectionPool, 10, 5);
        pools.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), connectionPool);
        int i2 = 10 - 5;
        for (int i3 = 0; i3 < i2; i3++) {
            ConnectionContext connection = connectionPool.getConnection();
            Assert.assertTrue(connection.isUsable());
            connection.getClient();
            i++;
        }
        checkPoolConnections(TEST_USER1, 10, i);
        Assert.assertTrue(connectionPool.getConnection().isActive());
    }

    @Test
    public void testValidClientIndex() throws Exception {
        ConnectionPool connectionPool = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class, (PoolAlignmentContext) null);
        for (int i = -3; i <= 3; i++) {
            connectionPool.getClientIndex().set(i);
            ConnectionContext connection = connectionPool.getConnection();
            Assert.assertNotNull(connection);
            Assert.assertTrue(connection.isUsable());
        }
    }

    @Test
    public void getGetConnectionNamenodeProtocol() throws Exception {
        Map pools = this.connManager.getPools();
        int i = 5;
        ConnectionPool connectionPool = new ConnectionPool(this.conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class, (PoolAlignmentContext) null);
        addConnectionsToPool(connectionPool, 10, 5);
        pools.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class), connectionPool);
        int i2 = 10 - 5;
        for (int i3 = 0; i3 < i2; i3++) {
            ConnectionContext connection = connectionPool.getConnection();
            Assert.assertTrue(connection.isUsable());
            connection.getClient();
            i++;
        }
        checkPoolConnections(TEST_USER1, 10, i);
        Assert.assertTrue(connectionPool.getConnection().isActive());
    }

    private void addConnectionsToPool(ConnectionPool connectionPool, int i, int i2) throws IOException {
        for (int i3 = 0; i3 < i; i3++) {
            ConnectionContext newConnection = connectionPool.newConnection();
            connectionPool.addConnection(newConnection);
            if (i3 < i2) {
                newConnection.getClient();
            }
        }
    }

    private void checkPoolConnections(UserGroupInformation userGroupInformation, int i, int i2) {
        boolean z = false;
        Iterator it = this.connManager.getPools().entrySet().iterator();
        while (it.hasNext()) {
            if (((ConnectionPoolId) ((Map.Entry) it.next()).getKey()).getUgi() == userGroupInformation) {
                Assert.assertEquals(i, ((ConnectionPool) r0.getValue()).getNumConnections());
                Assert.assertEquals(i2, ((ConnectionPool) r0.getValue()).getNumActiveConnections());
                Assert.assertEquals(i - i2, ((ConnectionPool) r0.getValue()).getNumIdleConnections());
                z = true;
            }
        }
        if (z) {
            return;
        }
        Assert.fail("Connection pool not found for user " + userGroupInformation.getUserName());
    }

    @Test
    public void testAdvanceClientStateId() throws IOException {
        ConnectionManager connectionManager = new ConnectionManager(new Configuration());
        connectionManager.start();
        Map pools = connectionManager.getPools();
        Server.Call call = new Server.Call(1, 1, (Void) null, (Void) null, RPC.RpcKind.RPC_BUILTIN, new byte[]{1, 2, 3});
        HashMap hashMap = new HashMap();
        hashMap.put("ns0", 1L);
        HdfsProtos.RouterFederatedStateProto.Builder newBuilder = HdfsProtos.RouterFederatedStateProto.newBuilder();
        Objects.requireNonNull(newBuilder);
        hashMap.forEach((v1, v2) -> {
            r1.putNamespaceStateIds(v1, v2);
        });
        call.setFederatedNamespaceState(newBuilder.build().toByteString());
        Server.getCurCall().set(call);
        connectionManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
        Assert.assertEquals(1L, pools.size());
        ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class);
        Assert.assertEquals(1L, ((ConnectionPool) pools.get(connectionPoolId)).getPoolAlignmentContext().getPoolLocalStateId());
        Server.Call call2 = new Server.Call(2, 1, (Void) null, (Void) null, RPC.RpcKind.RPC_BUILTIN, new byte[]{1, 2, 3});
        hashMap.clear();
        hashMap.put("ns0", 2L);
        HdfsProtos.RouterFederatedStateProto.Builder newBuilder2 = HdfsProtos.RouterFederatedStateProto.newBuilder();
        Objects.requireNonNull(newBuilder2);
        hashMap.forEach((v1, v2) -> {
            r1.putNamespaceStateIds(v1, v2);
        });
        call2.setFederatedNamespaceState(newBuilder2.build().toByteString());
        Server.getCurCall().set(call2);
        connectionManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
        Assert.assertEquals(1L, pools.size());
        Assert.assertEquals(2L, ((ConnectionPool) pools.get(connectionPoolId)).getPoolAlignmentContext().getPoolLocalStateId());
    }

    @Test
    public void testConfigureConnectionActiveRatio() throws IOException {
        testConnectionCleanup(0.8f, 10, 7, 9);
        testConnectionCleanup(0.8f, 10, 6, 8);
    }

    private void testConnectionCleanup(float f, int i, int i2, int i3) throws IOException {
        Configuration configuration = new Configuration();
        configuration.setFloat("dfs.federation.router.connection.min-active-ratio", f);
        ConnectionManager connectionManager = new ConnectionManager(configuration);
        connectionManager.start();
        connectionManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class, "ns0");
        ConnectionPool connectionPool = (ConnectionPool) connectionManager.getPools().get(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class));
        Assert.assertEquals(f, connectionPool.getMinActiveRatio(), 0.001f);
        connectionPool.getConnection().getClient();
        Assert.assertEquals(1L, connectionPool.getNumActiveConnections());
        addConnectionsToPool(connectionPool, i - 1, i2 - 1);
        connectionManager.cleanup(connectionPool);
        Assert.assertEquals(i3, connectionPool.getNumConnections());
        connectionManager.close();
    }

    @Test
    public void testUnsupportedProtoExceptionMsg() throws Exception {
        LambdaTestUtils.intercept(IllegalStateException.class, "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> {
            return ConnectionPool.newConnection(this.conf, TEST_NN_ADDRESS, TEST_USER1, TestConnectionManager.class, false, 0, (AlignmentContext) null);
        });
    }
}
