package org.apache.spark.network;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/TransportClientFactorySuite.class */
public class TransportClientFactorySuite {
    private TransportConf conf;
    private TransportContext context;
    private TransportServer server1;
    private TransportServer server2;

    @Before
    public void setUp() {
        this.conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
        this.context = new TransportContext(this.conf, new NoOpRpcHandler());
        this.server1 = this.context.createServer();
        this.server2 = this.context.createServer();
    }

    @After
    public void tearDown() {
        JavaUtils.closeQuietly(this.server1);
        JavaUtils.closeQuietly(this.server2);
    }

    private void testClientReuse(int i, boolean z) throws IOException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(i));
        TransportClientFactory createClientFactory = new TransportContext(new TransportConf("shuffle", new MapConfigProvider(hashMap)), new NoOpRpcHandler()).createClientFactory();
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        AtomicInteger atomicInteger = new AtomicInteger();
        Thread[] threadArr = new Thread[i * 10];
        for (int i2 = 0; i2 < threadArr.length; i2++) {
            threadArr[i2] = new Thread(() -> {
                try {
                    TransportClient createClient = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
                    Assert.assertTrue(createClient.isActive());
                    synchronizedSet.add(createClient);
                } catch (IOException e) {
                    atomicInteger.incrementAndGet();
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            });
            if (z) {
                threadArr[i2].start();
            } else {
                threadArr[i2].run();
            }
        }
        for (Thread thread : threadArr) {
            thread.join();
        }
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertTrue(synchronizedSet.size() <= i);
        Iterator it = synchronizedSet.iterator();
        while (it.hasNext()) {
            ((TransportClient) it.next()).close();
        }
        createClientFactory.close();
    }

    @Test
    public void reuseClientsUpToConfigVariable() throws Exception {
        testClientReuse(1, false);
        testClientReuse(2, false);
        testClientReuse(3, false);
        testClientReuse(4, false);
    }

    @Test
    public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
        testClientReuse(1, true);
        testClientReuse(2, true);
        testClientReuse(3, true);
        testClientReuse(4, true);
    }

    @Test
    public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
        TransportClientFactory createClientFactory = this.context.createClientFactory();
        TransportClient createClient = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
        TransportClient createClient2 = createClientFactory.createClient(TestUtils.getLocalHost(), this.server2.getPort());
        Assert.assertTrue(createClient.isActive());
        Assert.assertTrue(createClient2.isActive());
        Assert.assertNotSame(createClient, createClient2);
        createClientFactory.close();
    }

    @Test
    public void neverReturnInactiveClients() throws IOException, InterruptedException {
        TransportClientFactory createClientFactory = this.context.createClientFactory();
        TransportClient createClient = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
        createClient.close();
        long currentTimeMillis = System.currentTimeMillis();
        while (createClient.isActive() && System.currentTimeMillis() - currentTimeMillis < 3000) {
            Thread.sleep(10L);
        }
        Assert.assertFalse(createClient.isActive());
        TransportClient createClient2 = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
        Assert.assertNotSame(createClient, createClient2);
        Assert.assertTrue(createClient2.isActive());
        createClientFactory.close();
    }

    @Test
    public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
        TransportClientFactory createClientFactory = this.context.createClientFactory();
        TransportClient createClient = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
        TransportClient createClient2 = createClientFactory.createClient(TestUtils.getLocalHost(), this.server2.getPort());
        Assert.assertTrue(createClient.isActive());
        Assert.assertTrue(createClient2.isActive());
        createClientFactory.close();
        Assert.assertFalse(createClient.isActive());
        Assert.assertFalse(createClient2.isActive());
    }

    @Test
    public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException {
        TransportClientFactory createClientFactory = new TransportContext(new TransportConf("shuffle", new ConfigProvider() { // from class: org.apache.spark.network.TransportClientFactorySuite.1
            public String get(String str) {
                if ("spark.shuffle.io.connectionTimeout".equals(str)) {
                    return "1s";
                }
                String property = System.getProperty(str);
                if (property == null) {
                    throw new NoSuchElementException(str);
                }
                return property;
            }

            public Iterable<Map.Entry<String, String>> getAll() {
                throw new UnsupportedOperationException();
            }
        }), new NoOpRpcHandler(), true).createClientFactory();
        try {
            TransportClient createClient = createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
            Assert.assertTrue(createClient.isActive());
            long currentTimeMillis = System.currentTimeMillis() + 10000;
            while (createClient.isActive() && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(10L);
            }
            Assert.assertFalse(createClient.isActive());
            if (createClientFactory != null) {
                createClientFactory.close();
            }
        } catch (Throwable th) {
            if (createClientFactory != null) {
                try {
                    createClientFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(expected = IOException.class)
    public void closeFactoryBeforeCreateClient() throws IOException, InterruptedException {
        TransportClientFactory createClientFactory = this.context.createClientFactory();
        createClientFactory.close();
        createClientFactory.createClient(TestUtils.getLocalHost(), this.server1.getPort());
    }
}
