/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient.reactivestreams;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.asynchttpclient.reactivestreams.HttpStaticFileServer;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class ReactiveStreamsDownLoadTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownLoadTest.class);
    private int serverPort = 8080;
    private File largeFile;
    private File smallFile;

    @BeforeClass(alwaysRun=true)
    public void setUpBeforeTest() throws Exception {
        this.largeFile = TestUtils.createTempFile(15360);
        this.smallFile = TestUtils.createTempFile(20);
        HttpStaticFileServer.start(this.serverPort);
    }

    @AfterClass(alwaysRun=true)
    public void tearDown() throws Exception {
        HttpStaticFileServer.shutdown();
    }

    @Test(groups={"standalone"})
    public void streamedResponseLargeFileTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            String largeFileName = "http://localhost:" + this.serverPort + "/" + this.largeFile.getName();
            ListenableFuture future = c.prepareGet(largeFileName).execute((AsyncHandler)new SimpleStreamedAsyncHandler());
            byte[] result = ((SimpleStreamedAsyncHandler)future.get()).getBytes();
            Assert.assertEquals((long)result.length, (long)this.largeFile.length());
        }
    }

    @Test(groups={"standalone"})
    public void streamedResponseSmallFileTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            String smallFileName = "http://localhost:" + this.serverPort + "/" + this.smallFile.getName();
            ListenableFuture future = c.prepareGet(smallFileName).execute((AsyncHandler)new SimpleStreamedAsyncHandler());
            byte[] result = ((SimpleStreamedAsyncHandler)future.get()).getBytes();
            LOGGER.debug("Result file size: " + result.length);
            Assert.assertEquals((long)result.length, (long)this.smallFile.length());
        }
    }

    protected static class SimpleSubscriber<T>
    implements Subscriber<T> {
        private volatile Subscription subscription;
        private volatile Throwable error;
        private final List<T> elements = Collections.synchronizedList(new ArrayList());
        private final CountDownLatch latch = new CountDownLatch(1);

        protected SimpleSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            LOGGER.debug("SimpleSubscriber onSubscribe");
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(T t) {
            LOGGER.debug("SimpleSubscriber onNext");
            this.elements.add(t);
            this.subscription.request(1L);
        }

        public void onError(Throwable error) {
            LOGGER.error("SimpleSubscriber onError");
            this.error = error;
            this.latch.countDown();
        }

        public void onComplete() {
            LOGGER.debug("SimpleSubscriber onComplete");
            this.latch.countDown();
        }

        public List<T> getElements() throws Throwable {
            this.latch.await();
            if (this.error != null) {
                throw this.error;
            }
            return this.elements;
        }
    }

    protected static class SimpleStreamedAsyncHandler
    implements StreamedAsyncHandler<SimpleStreamedAsyncHandler> {
        private final SimpleSubscriber<HttpResponseBodyPart> subscriber;

        public SimpleStreamedAsyncHandler() {
            this(new SimpleSubscriber<HttpResponseBodyPart>());
        }

        public SimpleStreamedAsyncHandler(SimpleSubscriber<HttpResponseBodyPart> subscriber) {
            this.subscriber = subscriber;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onStream");
            publisher.subscribe(this.subscriber);
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable t) {
            throw new AssertionError((Object)t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
            LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
            throw new AssertionError((Object)"Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public SimpleStreamedAsyncHandler onCompleted() throws Exception {
            LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onSubscribe");
            return this;
        }

        public byte[] getBytes() throws Throwable {
            List<HttpResponseBodyPart> bodyParts = this.subscriber.getElements();
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            for (HttpResponseBodyPart part : bodyParts) {
                bytes.write(part.getBodyPartBytes());
            }
            return bytes.toByteArray();
        }
    }
}

