/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DecompressorStream;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.DummyCompressionCodec;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

public class TestCodecUtils {
    @Test
    public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception {
        this.testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec((CompressionCodec)new DefaultCodec());
    }

    private void testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(CompressionCodec codec) throws InterruptedException, ExecutionException {
        int modifiedBufferSize = 1000;
        int numberOfThreads = 1000;
        ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
        Configuration conf = new Configuration();
        conf.setBoolean("tez.runtime.compress", true);
        ((Configurable)codec).setConf(conf);
        Future[] futures = new Future[numberOfThreads];
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < numberOfThreads; ++i) {
            futures[i] = service.submit(() -> {
                try {
                    this.waitForLatch(latch);
                    Decompressor decompressor = CodecUtils.getDecompressor((CompressionCodec)codec);
                    DecompressorStream stream = (DecompressorStream)CodecUtils.getDecompressedInputStreamWithBufferSize((CompressionCodec)codec, (IFileInputStream)((IFileInputStream)Mockito.mock(IFileInputStream.class)), (Decompressor)decompressor, (int)modifiedBufferSize);
                    Assert.assertEquals((String)"stream buffer size is incorrect", (long)modifiedBufferSize, (long)this.getBufferSize(stream));
                    CodecPool.returnDecompressor((Decompressor)decompressor);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        latch.countDown();
        for (Future f : futures) {
            f.get();
        }
    }

    @Test
    @Ignore
    public void testConcurrentCompressorDecompressorCreation() throws Exception {
        this.testConcurrentCompressorDecompressorCreationOnCodec((CompressionCodec)new DefaultCodec());
    }

    private void testConcurrentCompressorDecompressorCreationOnCodec(CompressionCodec codec) throws IOException, InterruptedException, ExecutionException {
        int modifiedBufferSize = 1000;
        int numberOfThreads = 1000;
        ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
        Configuration conf = new Configuration();
        conf.setBoolean("tez.runtime.compress", true);
        ((Configurable)codec).setConf(conf);
        Future[] futures = new Future[numberOfThreads];
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < numberOfThreads; ++i) {
            if (i % 3 == 0) {
                futures[i] = service.submit(() -> {
                    try {
                        this.waitForLatch(latch);
                        Decompressor decompressor = CodecUtils.getDecompressor((CompressionCodec)codec);
                        CompressionInputStream stream = (CompressionInputStream)CodecUtils.getDecompressedInputStreamWithBufferSize((CompressionCodec)codec, (IFileInputStream)((IFileInputStream)Mockito.mock(IFileInputStream.class)), (Decompressor)decompressor, (int)modifiedBufferSize);
                        Assert.assertEquals((String)"stream buffer size is incorrect", (long)modifiedBufferSize, (long)this.getBufferSize(stream));
                        CodecPool.returnDecompressor((Decompressor)decompressor);
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                continue;
            }
            if (i % 3 == 1) {
                futures[i] = service.submit(() -> {
                    try {
                        this.waitForLatch(latch);
                        Compressor compressor = CodecUtils.getCompressor((CompressionCodec)codec);
                        CompressionOutputStream stream = CodecUtils.createOutputStream((CompressionCodec)codec, (OutputStream)((OutputStream)Mockito.mock(OutputStream.class)), (Compressor)compressor);
                        Assert.assertEquals((String)"stream buffer size is incorrect", (long)4096L, (long)this.getBufferSize(stream));
                        CodecPool.returnCompressor((Compressor)compressor);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                continue;
            }
            if (i % 3 != 2) continue;
            futures[i] = service.submit(() -> {
                try {
                    this.waitForLatch(latch);
                    Decompressor decompressor = CodecUtils.getDecompressor((CompressionCodec)codec);
                    CompressionInputStream stream = CodecUtils.createInputStream((CompressionCodec)codec, (InputStream)((InputStream)Mockito.mock(InputStream.class)), (Decompressor)decompressor);
                    Assert.assertEquals((String)"stream buffer size is incorrect", (long)4096L, (long)this.getBufferSize(stream));
                    CodecPool.returnDecompressor((Decompressor)decompressor);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        latch.countDown();
        for (Future f : futures) {
            f.get();
        }
    }

    @Test
    public void testDefaultBufferSize() {
        Configuration conf = new Configuration();
        Assert.assertEquals((long)262144L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new DummyCompressionCodec()));
        Assert.assertEquals((long)4096L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new DefaultCodec()));
        Assert.assertEquals((long)4096L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new BZip2Codec()));
        Assert.assertEquals((long)4096L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new GzipCodec()));
        Assert.assertEquals((long)262144L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new SnappyCodec()));
        Assert.assertEquals((long)0L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new ZStandardCodec()));
        Assert.assertEquals((long)262144L, (long)CodecUtils.getDefaultBufferSize((Configuration)conf, (CompressionCodec)new Lz4Codec()));
    }

    private void waitForLatch(CountDownLatch latch) {
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private int getBufferSize(Object stream) {
        try {
            Field field = stream.getClass().getDeclaredField("buffer");
            field.setAccessible(true);
            byte[] buffer = (byte[])field.get(stream);
            return buffer.length;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

