/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.adl;

import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import okio.Buffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.AdlMockWebServer;
import org.apache.hadoop.fs.adl.TestADLResponseData;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class TestConcurrentDataReadOperations
extends AdlMockWebServer {
    private static final Logger LOG = LoggerFactory.getLogger(TestConcurrentDataReadOperations.class);
    private static final Object LOCK = new Object();
    private static FSDataInputStream commonHandle = null;
    private int concurrencyLevel;

    public TestConcurrentDataReadOperations(int concurrencyLevel) {
        this.concurrencyLevel = concurrencyLevel;
    }

    @Parameterized.Parameters(name="{index}")
    public static Collection<?> testDataNumberOfConcurrentRun() {
        return Arrays.asList({1}, {2}, {3}, {4}, {5});
    }

    public static byte[] getRandomByteArrayData(int size) {
        byte[] b = new byte[size];
        Random rand = new Random();
        rand.nextBytes(b);
        return b;
    }

    private void setDispatcher(final ArrayList<CreateTestData> testData) {
        this.getMockServer().setDispatcher(new Dispatcher(){

            public MockResponse dispatch(RecordedRequest recordedRequest) throws InterruptedException {
                CreateTestData currentRequest = null;
                for (CreateTestData local : testData) {
                    if (!recordedRequest.getPath().contains(local.path.toString())) continue;
                    currentRequest = local;
                    break;
                }
                if (currentRequest == null) {
                    new MockResponse().setBody("Request data not found").setResponseCode(501);
                }
                if (recordedRequest.getRequestLine().contains("op=GETFILESTATUS")) {
                    return new MockResponse().setResponseCode(200).setBody(TestADLResponseData.getGetFileStatusJSONResponse(currentRequest.data.length));
                }
                if (recordedRequest.getRequestLine().contains("op=OPEN")) {
                    String request = recordedRequest.getRequestLine();
                    int offset = 0;
                    int byteCount = 0;
                    Pattern pattern = Pattern.compile("offset=([0-9]+)");
                    Matcher matcher = pattern.matcher(request);
                    if (matcher.find()) {
                        LOG.debug(matcher.group(1));
                        offset = Integer.parseInt(matcher.group(1));
                    }
                    if ((matcher = (pattern = Pattern.compile("length=([0-9]+)")).matcher(request)).find()) {
                        LOG.debug(matcher.group(1));
                        byteCount = Integer.parseInt(matcher.group(1));
                    }
                    Buffer buf = new Buffer();
                    buf.write(currentRequest.data, offset, Math.min(currentRequest.data.length - offset, byteCount));
                    return new MockResponse().setResponseCode(200).setChunkedBody(buf, 0x400000);
                }
                return new MockResponse().setBody("NOT SUPPORTED").setResponseCode(501);
            }
        });
    }

    @Before
    public void resetHandle() {
        commonHandle = null;
    }

    @Test
    public void testParallelReadOnDifferentStreams() throws IOException, InterruptedException, ExecutionException {
        ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
        Random random = new Random();
        for (int i = 0; i < this.concurrencyLevel; ++i) {
            CreateTestData testData = new CreateTestData();
            testData.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()), TestConcurrentDataReadOperations.getRandomByteArrayData(random.nextInt(0x100000)));
            createTestData.add(testData);
        }
        this.setDispatcher(createTestData);
        ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
        for (CreateTestData local : createTestData) {
            ReadTestData localReadData = new ReadTestData();
            localReadData.set(local.path, local.data, 0);
            readTestData.add(localReadData);
        }
        this.runReadTest(readTestData, false);
    }

    @Test
    public void testParallelReadOnSameStreams() throws IOException, InterruptedException, ExecutionException {
        ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
        Random random = new Random();
        for (int i = 0; i < 1; ++i) {
            CreateTestData testData = new CreateTestData();
            testData.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()), TestConcurrentDataReadOperations.getRandomByteArrayData(0x100000));
            createTestData.add(testData);
        }
        this.setDispatcher(createTestData);
        ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
        ByteArrayInputStream buffered = new ByteArrayInputStream(createTestData.get((int)0).data);
        ReadTestData readInitially = new ReadTestData();
        byte[] initialData = new byte[0x100000];
        buffered.read(initialData);
        readInitially.set(createTestData.get((int)0).path, initialData, 0);
        readTestData.add(readInitially);
        this.runReadTest(readTestData, false);
        readTestData.clear();
        for (int i = 0; i < this.concurrencyLevel * 5; ++i) {
            ReadTestData localReadData = new ReadTestData();
            int offset = random.nextInt(1048575);
            int length = 0x100000 - offset;
            byte[] expectedData = new byte[length];
            buffered.reset();
            buffered.skip(offset);
            buffered.read(expectedData);
            localReadData.set(createTestData.get((int)0).path, expectedData, offset);
            readTestData.add(localReadData);
        }
        this.runReadTest(readTestData, true);
    }

    void runReadTest(ArrayList<ReadTestData> testData, boolean useSameStream) throws InterruptedException, ExecutionException {
        int i;
        ExecutorService executor = Executors.newFixedThreadPool(testData.size());
        Future[] subtasks = new Future[testData.size()];
        for (i = 0; i < testData.size(); ++i) {
            subtasks[i] = executor.submit(new ReadConcurrentRunnable(testData.get((int)i).data, testData.get((int)i).path, testData.get((int)i).offset, useSameStream));
        }
        executor.shutdown();
        executor.awaitTermination(120L, TimeUnit.SECONDS);
        for (i = 0; i < testData.size(); ++i) {
            Assert.assertTrue((boolean)((Boolean)subtasks[i].get()));
        }
    }

    class ReadConcurrentRunnable
    implements Callable<Boolean> {
        private Path path;
        private int offset;
        private byte[] expectedData;
        private boolean useSameStream;

        public ReadConcurrentRunnable(byte[] expectedData, Path path, int offset, boolean useSameStream) {
            this.path = path;
            this.offset = offset;
            this.expectedData = expectedData;
            this.useSameStream = useSameStream;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Boolean call() throws IOException {
            try {
                FSDataInputStream in;
                if (this.useSameStream) {
                    Object object = LOCK;
                    synchronized (object) {
                        if (commonHandle == null) {
                            commonHandle = TestConcurrentDataReadOperations.this.getMockAdlFileSystem().open(this.path);
                        }
                        in = commonHandle;
                    }
                } else {
                    in = TestConcurrentDataReadOperations.this.getMockAdlFileSystem().open(this.path);
                }
                byte[] actualData = new byte[this.expectedData.length];
                in.readFully((long)this.offset, actualData);
                Assert.assertArrayEquals((String)("Path :" + this.path.toString() + " did not match."), (byte[])this.expectedData, (byte[])actualData);
                if (!this.useSameStream) {
                    in.close();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    }

    class CreateTestData {
        private Path path;
        private byte[] data;

        CreateTestData() {
        }

        public void set(Path filePath, byte[] dataToBeWritten) {
            this.path = filePath;
            this.data = dataToBeWritten;
        }
    }

    class ReadTestData {
        private Path path;
        private byte[] data;
        private int offset;

        ReadTestData() {
        }

        public void set(Path filePath, byte[] dataToBeRead, int fromOffset) {
            this.path = filePath;
            this.data = dataToBeRead;
            this.offset = fromOffset;
        }
    }
}

