package org.apache.spark.network;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/RequestTimeoutIntegrationSuite.class */
public class RequestTimeoutIntegrationSuite {
    private TransportServer server;
    private TransportClientFactory clientFactory;
    private StreamManager defaultManager;
    private TransportConf conf;
    private static final int FOREVER = 60000;

    /* loaded from: input_file:org/apache/spark/network/RequestTimeoutIntegrationSuite$TestCallback.class */
    static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
        Throwable failure;
        int successLength = -1;
        final CountDownLatch latch = new CountDownLatch(1);

        TestCallback() {
        }

        public void onSuccess(ByteBuffer byteBuffer) {
            this.successLength = byteBuffer.remaining();
            this.latch.countDown();
        }

        public void onFailure(Throwable th) {
            this.failure = th;
            this.latch.countDown();
        }

        public void onSuccess(int i, ManagedBuffer managedBuffer) {
            try {
                this.successLength = managedBuffer.nioByteBuffer().remaining();
                this.latch.countDown();
            } catch (IOException e) {
                this.latch.countDown();
            } catch (Throwable th) {
                this.latch.countDown();
                throw th;
            }
        }

        public void onFailure(int i, Throwable th) {
            this.failure = th;
            this.latch.countDown();
        }
    }

    @Before
    public void setUp() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("spark.shuffle.io.connectionTimeout", "10s");
        this.conf = new TransportConf("shuffle", new MapConfigProvider(newHashMap));
        this.defaultManager = new StreamManager() { // from class: org.apache.spark.network.RequestTimeoutIntegrationSuite.1
            public ManagedBuffer getChunk(long j, int i) {
                throw new UnsupportedOperationException();
            }
        };
    }

    @After
    public void tearDown() {
        if (this.server != null) {
            this.server.close();
        }
        if (this.clientFactory != null) {
            this.clientFactory.close();
        }
    }

    @Test
    public void timeoutInactiveRequests() throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        TransportContext transportContext = new TransportContext(this.conf, new RpcHandler() { // from class: org.apache.spark.network.RequestTimeoutIntegrationSuite.2
            public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
                try {
                    semaphore.acquire();
                    rpcResponseCallback.onSuccess(ByteBuffer.allocate(16));
                } catch (InterruptedException e) {
                }
            }

            public StreamManager getStreamManager() {
                return RequestTimeoutIntegrationSuite.this.defaultManager;
            }
        });
        this.server = transportContext.createServer();
        this.clientFactory = transportContext.createClientFactory();
        TransportClient createClient = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback testCallback = new TestCallback();
        createClient.sendRpc(ByteBuffer.allocate(0), testCallback);
        testCallback.latch.await();
        Assert.assertEquals(16L, testCallback.successLength);
        TestCallback testCallback2 = new TestCallback();
        createClient.sendRpc(ByteBuffer.allocate(0), testCallback2);
        testCallback2.latch.await(60L, TimeUnit.SECONDS);
        Assert.assertNotNull(testCallback2.failure);
        Assert.assertTrue(testCallback2.failure instanceof IOException);
        semaphore.release();
    }

    @Test
    public void timeoutCleanlyClosesClient() throws Exception {
        final Semaphore semaphore = new Semaphore(0);
        TransportContext transportContext = new TransportContext(this.conf, new RpcHandler() { // from class: org.apache.spark.network.RequestTimeoutIntegrationSuite.3
            public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
                try {
                    semaphore.acquire();
                    rpcResponseCallback.onSuccess(ByteBuffer.allocate(16));
                } catch (InterruptedException e) {
                }
            }

            public StreamManager getStreamManager() {
                return RequestTimeoutIntegrationSuite.this.defaultManager;
            }
        });
        this.server = transportContext.createServer();
        this.clientFactory = transportContext.createClientFactory();
        TransportClient createClient = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback testCallback = new TestCallback();
        createClient.sendRpc(ByteBuffer.allocate(0), testCallback);
        testCallback.latch.await();
        Assert.assertTrue(testCallback.failure instanceof IOException);
        Assert.assertFalse(createClient.isActive());
        semaphore.release(2);
        TransportClient createClient2 = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback testCallback2 = new TestCallback();
        createClient2.sendRpc(ByteBuffer.allocate(0), testCallback2);
        testCallback2.latch.await();
        Assert.assertEquals(16L, testCallback2.successLength);
        Assert.assertNull(testCallback2.failure);
    }

    @Test
    public void furtherRequestsDelay() throws Exception {
        final byte[] bArr = new byte[16];
        final StreamManager streamManager = new StreamManager() { // from class: org.apache.spark.network.RequestTimeoutIntegrationSuite.4
            public ManagedBuffer getChunk(long j, int i) {
                Uninterruptibles.sleepUninterruptibly(60000L, TimeUnit.MILLISECONDS);
                return new NioManagedBuffer(ByteBuffer.wrap(bArr));
            }
        };
        TransportContext transportContext = new TransportContext(this.conf, new RpcHandler() { // from class: org.apache.spark.network.RequestTimeoutIntegrationSuite.5
            public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
                throw new UnsupportedOperationException();
            }

            public StreamManager getStreamManager() {
                return streamManager;
            }
        });
        this.server = transportContext.createServer();
        this.clientFactory = transportContext.createClientFactory();
        TransportClient createClient = this.clientFactory.createClient(TestUtils.getLocalHost(), this.server.getPort());
        TestCallback testCallback = new TestCallback();
        createClient.fetchChunk(0L, 0, testCallback);
        Uninterruptibles.sleepUninterruptibly(1200L, TimeUnit.MILLISECONDS);
        TestCallback testCallback2 = new TestCallback();
        createClient.fetchChunk(0L, 1, testCallback2);
        Uninterruptibles.sleepUninterruptibly(1200L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(-1L, testCallback.successLength);
        Assert.assertNull(testCallback.failure);
        testCallback.latch.await(60L, TimeUnit.SECONDS);
        Assert.assertTrue(testCallback.failure instanceof IOException);
        Assert.assertTrue(testCallback2.failure instanceof IOException);
    }
}
