/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient.reactivestreams;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.Assert;
import org.testng.annotations.Test;
import rx.Observable;
import rx.RxReactiveStreams;

public class ReactiveStreamsTest
extends AbstractBasicTest {
    @Test(groups={"standalone"})
    public void testStreamingPutImage() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            Response response = (Response)((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_PUBLISHER)).execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            Assert.assertEquals((Object)response.getResponseBodyAsBytes(), (Object)TestUtils.LARGE_IMAGE_BYTES);
        }
    }

    @Test(groups={"standalone"}, enabled=false)
    public void testConnectionDoesNotGetClosed() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            BoundRequestBuilder requestBuilder = (BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_PUBLISHER);
            Response response = (Response)requestBuilder.execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            Assert.assertEquals((Object)response.getResponseBodyAsBytes(), (Object)TestUtils.LARGE_IMAGE_BYTES);
            response = (Response)requestBuilder.execute().get();
            Assert.assertEquals((int)response.getStatusCode(), (int)200);
            Assert.assertEquals((Object)response.getResponseBodyAsBytes(), (Object)TestUtils.LARGE_IMAGE_BYTES);
        }
    }

    @Test(groups={"standalone"}, expectedExceptions={ExecutionException.class})
    public void testFailingStream() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setRequestTimeout(600000));){
            Observable failingObservable = Observable.error((Throwable)new FailedStream());
            Publisher failingPublisher = RxReactiveStreams.toPublisher((Observable)failingObservable);
            ((BoundRequestBuilder)client.preparePut(this.getTargetUrl()).setBody(failingPublisher)).execute().get();
        }
    }

    @Test(groups={"standalone"})
    public void streamedResponseTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            ListenableFuture future = ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new SimpleStreamedAsyncHandler());
            Assert.assertEquals((Object)((SimpleStreamedAsyncHandler)future.get()).getBytes(), (Object)TestUtils.LARGE_IMAGE_BYTES);
            future = ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new SimpleStreamedAsyncHandler());
            Assert.assertEquals((Object)((SimpleStreamedAsyncHandler)future.get()).getBytes(), (Object)TestUtils.LARGE_IMAGE_BYTES);
            Assert.assertEquals((String)((Response)((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody("Hello")).execute().get()).getResponseBody(), (String)"Hello");
        }
    }

    @Test(groups={"standalone"})
    public void cancelStreamedResponseTest() throws Throwable {
        try (AsyncHttpClient c = Dsl.asyncHttpClient();){
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(0)).get();
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(1)).get();
            ((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new CancellingStreamedAsyncProvider(10)).get();
            Assert.assertEquals((String)((Response)((BoundRequestBuilder)c.preparePost(this.getTargetUrl()).setBody("Hello")).execute().get()).getResponseBody(), (String)"Hello");
        }
    }

    static class CancellingSubscriber<T>
    implements Subscriber<T> {
        private final int cancelAfter;
        private volatile Subscription subscription;
        private volatile int count;

        public CancellingSubscriber(int cancelAfter) {
            this.cancelAfter = cancelAfter;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (this.cancelAfter == 0) {
                subscription.cancel();
            } else {
                subscription.request(1L);
            }
        }

        public void onNext(T t) {
            ++this.count;
            if (this.count == this.cancelAfter) {
                this.subscription.cancel();
            } else {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable error) {
        }

        public void onComplete() {
        }
    }

    static class CancellingStreamedAsyncProvider
    implements StreamedAsyncHandler<CancellingStreamedAsyncProvider> {
        private final int cancelAfter;

        public CancellingStreamedAsyncProvider(int cancelAfter) {
            this.cancelAfter = cancelAfter;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(new CancellingSubscriber(this.cancelAfter));
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable t) {
            throw new AssertionError((Object)t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
            throw new AssertionError((Object)"Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public CancellingStreamedAsyncProvider onCompleted() throws Exception {
            return this;
        }
    }

    protected static class SimpleSubscriber<T>
    implements Subscriber<T> {
        private volatile Subscription subscription;
        private volatile Throwable error;
        private final List<T> elements = Collections.synchronizedList(new ArrayList());
        private final CountDownLatch latch = new CountDownLatch(1);

        protected SimpleSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(T t) {
            this.elements.add(t);
            this.subscription.request(1L);
        }

        public void onError(Throwable error) {
            this.error = error;
            this.latch.countDown();
        }

        public void onComplete() {
            this.latch.countDown();
        }

        public List<T> getElements() throws Throwable {
            this.latch.await();
            if (this.error != null) {
                throw this.error;
            }
            return this.elements;
        }
    }

    protected static class SimpleStreamedAsyncHandler
    implements StreamedAsyncHandler<SimpleStreamedAsyncHandler> {
        private final SimpleSubscriber<HttpResponseBodyPart> subscriber;

        public SimpleStreamedAsyncHandler() {
            this(new SimpleSubscriber<HttpResponseBodyPart>());
        }

        public SimpleStreamedAsyncHandler(SimpleSubscriber<HttpResponseBodyPart> subscriber) {
            this.subscriber = subscriber;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(this.subscriber);
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable t) {
            throw new AssertionError((Object)t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
            throw new AssertionError((Object)"Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public SimpleStreamedAsyncHandler onCompleted() throws Exception {
            return this;
        }

        public byte[] getBytes() throws Throwable {
            List<HttpResponseBodyPart> bodyParts = this.subscriber.getElements();
            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
            for (HttpResponseBodyPart part : bodyParts) {
                bytes.write(part.getBodyPartBytes());
            }
            return bytes.toByteArray();
        }
    }

    private class FailedStream
    extends RuntimeException {
        private FailedStream() {
        }
    }
}

