package org.apache.spark.network;

import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/ChunkFetchIntegrationSuite.class */
public class ChunkFetchIntegrationSuite {
    static final long STREAM_ID = 1;
    static final int BUFFER_CHUNK_INDEX = 0;
    static final int FILE_CHUNK_INDEX = 1;
    static TransportServer server;
    static TransportClientFactory clientFactory;
    static StreamManager streamManager;
    static File testFile;
    static ManagedBuffer bufferChunk;
    static ManagedBuffer fileChunk;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/ChunkFetchIntegrationSuite$FetchResult.class */
    public static class FetchResult {
        public Set<Integer> successChunks;
        public Set<Integer> failedChunks;
        public List<ManagedBuffer> buffers;

        FetchResult() {
        }

        public void releaseBuffers() {
            Iterator<ManagedBuffer> it = this.buffers.iterator();
            while (it.hasNext()) {
                it.next().release();
            }
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        final ByteBuffer allocate = ByteBuffer.allocate(100000);
        for (int i = BUFFER_CHUNK_INDEX; i < 100000; i += FILE_CHUNK_INDEX) {
            allocate.put((byte) i);
        }
        allocate.flip();
        bufferChunk = new NioManagedBuffer(allocate);
        testFile = File.createTempFile("shuffle-test-file", "txt");
        testFile.deleteOnExit();
        RandomAccessFile randomAccessFile = new RandomAccessFile(testFile, "rw");
        boolean z = FILE_CHUNK_INDEX;
        try {
            byte[] bArr = new byte[1024];
            new Random().nextBytes(bArr);
            randomAccessFile.write(bArr);
            z = BUFFER_CHUNK_INDEX;
            Closeables.close(randomAccessFile, z);
            final TransportConf transportConf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
            fileChunk = new FileSegmentManagedBuffer(transportConf, testFile, 10L, testFile.length() - 25);
            streamManager = new StreamManager() { // from class: org.apache.spark.network.ChunkFetchIntegrationSuite.1
                public ManagedBuffer getChunk(long j, int i2) {
                    Assert.assertEquals(ChunkFetchIntegrationSuite.STREAM_ID, j);
                    if (i2 == 0) {
                        return new NioManagedBuffer(allocate);
                    }
                    if (i2 == ChunkFetchIntegrationSuite.FILE_CHUNK_INDEX) {
                        return new FileSegmentManagedBuffer(transportConf, ChunkFetchIntegrationSuite.testFile, 10L, ChunkFetchIntegrationSuite.testFile.length() - 25);
                    }
                    throw new IllegalArgumentException("Invalid chunk index: " + i2);
                }
            };
            TransportContext transportContext = new TransportContext(transportConf, new RpcHandler() { // from class: org.apache.spark.network.ChunkFetchIntegrationSuite.2
                public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
                    throw new UnsupportedOperationException();
                }

                public StreamManager getStreamManager() {
                    return ChunkFetchIntegrationSuite.streamManager;
                }
            });
            server = transportContext.createServer();
            clientFactory = transportContext.createClientFactory();
        } catch (Throwable th) {
            Closeables.close(randomAccessFile, z);
            throw th;
        }
    }

    @AfterClass
    public static void tearDown() {
        bufferChunk.release();
        server.close();
        clientFactory.close();
        testFile.delete();
    }

    private FetchResult fetchChunks(List<Integer> list) throws Exception {
        TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        final Semaphore semaphore = new Semaphore(BUFFER_CHUNK_INDEX);
        final FetchResult fetchResult = new FetchResult();
        fetchResult.successChunks = Collections.synchronizedSet(new HashSet());
        fetchResult.failedChunks = Collections.synchronizedSet(new HashSet());
        fetchResult.buffers = Collections.synchronizedList(new LinkedList());
        ChunkReceivedCallback chunkReceivedCallback = new ChunkReceivedCallback() { // from class: org.apache.spark.network.ChunkFetchIntegrationSuite.3
            public void onSuccess(int i, ManagedBuffer managedBuffer) {
                managedBuffer.retain();
                fetchResult.successChunks.add(Integer.valueOf(i));
                fetchResult.buffers.add(managedBuffer);
                semaphore.release();
            }

            public void onFailure(int i, Throwable th) {
                fetchResult.failedChunks.add(Integer.valueOf(i));
                semaphore.release();
            }
        };
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            createClient.fetchChunk(STREAM_ID, it.next().intValue(), chunkReceivedCallback);
        }
        if (!semaphore.tryAcquire(list.size(), 120L, TimeUnit.SECONDS)) {
            Assert.fail("Timeout getting response from the server");
        }
        createClient.close();
        return fetchResult;
    }

    @Test
    public void fetchBufferChunk() throws Exception {
        FetchResult fetchChunks = fetchChunks(Arrays.asList(Integer.valueOf(BUFFER_CHUNK_INDEX)));
        Assert.assertEquals(Sets.newHashSet(new Integer[]{Integer.valueOf(BUFFER_CHUNK_INDEX)}), fetchChunks.successChunks);
        Assert.assertTrue(fetchChunks.failedChunks.isEmpty());
        assertBufferListsEqual(Arrays.asList(bufferChunk), fetchChunks.buffers);
        fetchChunks.releaseBuffers();
    }

    @Test
    public void fetchFileChunk() throws Exception {
        FetchResult fetchChunks = fetchChunks(Arrays.asList(Integer.valueOf(FILE_CHUNK_INDEX)));
        Assert.assertEquals(Sets.newHashSet(new Integer[]{Integer.valueOf(FILE_CHUNK_INDEX)}), fetchChunks.successChunks);
        Assert.assertTrue(fetchChunks.failedChunks.isEmpty());
        assertBufferListsEqual(Arrays.asList(fileChunk), fetchChunks.buffers);
        fetchChunks.releaseBuffers();
    }

    @Test
    public void fetchNonExistentChunk() throws Exception {
        FetchResult fetchChunks = fetchChunks(Arrays.asList(12345));
        Assert.assertTrue(fetchChunks.successChunks.isEmpty());
        Assert.assertEquals(Sets.newHashSet(new Integer[]{12345}), fetchChunks.failedChunks);
        Assert.assertTrue(fetchChunks.buffers.isEmpty());
    }

    @Test
    public void fetchBothChunks() throws Exception {
        FetchResult fetchChunks = fetchChunks(Arrays.asList(Integer.valueOf(BUFFER_CHUNK_INDEX), Integer.valueOf(FILE_CHUNK_INDEX)));
        Assert.assertEquals(Sets.newHashSet(new Integer[]{Integer.valueOf(BUFFER_CHUNK_INDEX), Integer.valueOf(FILE_CHUNK_INDEX)}), fetchChunks.successChunks);
        Assert.assertTrue(fetchChunks.failedChunks.isEmpty());
        assertBufferListsEqual(Arrays.asList(bufferChunk, fileChunk), fetchChunks.buffers);
        fetchChunks.releaseBuffers();
    }

    @Test
    public void fetchChunkAndNonExistent() throws Exception {
        FetchResult fetchChunks = fetchChunks(Arrays.asList(Integer.valueOf(BUFFER_CHUNK_INDEX), 12345));
        Assert.assertEquals(Sets.newHashSet(new Integer[]{Integer.valueOf(BUFFER_CHUNK_INDEX)}), fetchChunks.successChunks);
        Assert.assertEquals(Sets.newHashSet(new Integer[]{12345}), fetchChunks.failedChunks);
        assertBufferListsEqual(Arrays.asList(bufferChunk), fetchChunks.buffers);
        fetchChunks.releaseBuffers();
    }

    private static void assertBufferListsEqual(List<ManagedBuffer> list, List<ManagedBuffer> list2) throws Exception {
        Assert.assertEquals(list.size(), list2.size());
        for (int i = BUFFER_CHUNK_INDEX; i < list.size(); i += FILE_CHUNK_INDEX) {
            assertBuffersEqual(list.get(i), list2.get(i));
        }
    }

    private static void assertBuffersEqual(ManagedBuffer managedBuffer, ManagedBuffer managedBuffer2) throws Exception {
        ByteBuffer nioByteBuffer = managedBuffer.nioByteBuffer();
        ByteBuffer nioByteBuffer2 = managedBuffer2.nioByteBuffer();
        int remaining = nioByteBuffer.remaining();
        Assert.assertEquals(nioByteBuffer.remaining(), nioByteBuffer2.remaining());
        for (int i = BUFFER_CHUNK_INDEX; i < remaining; i += FILE_CHUNK_INDEX) {
            Assert.assertEquals(nioByteBuffer.get(), nioByteBuffer2.get());
        }
    }
}
