/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient.netty.handler;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hive.druid.io.netty.channel.Channel;
import org.apache.hive.druid.io.netty.channel.ChannelFuture;
import org.apache.hive.druid.io.netty.channel.ChannelFutureListener;
import org.apache.hive.druid.io.netty.util.concurrent.GenericFutureListener;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
import org.asynchttpclient.netty.request.NettyRequest;
import org.asynchttpclient.reactivestreams.ReactiveStreamsTest;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class NettyReactiveStreamsTest
extends ReactiveStreamsTest {
    @Test(groups={"standalone"})
    public void testRetryingOnFailingStream() throws Exception {
        try (AsyncHttpClient client = Dsl.asyncHttpClient();){
            CountDownLatch streamStarted = new CountDownLatch(1);
            CountDownLatch streamOnHold = new CountDownLatch(1);
            CountDownLatch replayingRequest = new CountDownLatch(1);
            final AtomicReference<Object> publisherRef = new AtomicReference<Object>(null);
            ((BoundRequestBuilder)client.preparePost(this.getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES)).execute((AsyncHandler)new ReplayedSimpleAsyncHandler(replayingRequest, new BlockedStreamSubscriber(streamStarted, streamOnHold)){

                @Override
                public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
                    if (!(publisher instanceof StreamedResponsePublisher)) {
                        throw new IllegalStateException(String.format("publisher %s is expected to be an instance of %s", publisher, StreamedResponsePublisher.class));
                    }
                    if (!publisherRef.compareAndSet(null, (StreamedResponsePublisher)publisher)) {
                        return AsyncHandler.State.ABORT;
                    }
                    return super.onStream(publisher);
                }
            });
            streamStarted.await();
            Assert.assertTrue((publisherRef.get() != null ? 1 : 0) != 0, (String)"Expected a not null publisher.");
            StreamedResponsePublisher publisher = publisherRef.get();
            final CountDownLatch channelClosed = new CountDownLatch(1);
            this.getChannel(publisher).close().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    channelClosed.countDown();
                }
            });
            streamOnHold.countDown();
            channelClosed.await();
            replayingRequest.await();
            Assert.assertTrue((boolean)true);
        }
    }

    private Channel getChannel(StreamedResponsePublisher publisher) throws Exception {
        Field field = publisher.getClass().getDeclaredField("channel");
        field.setAccessible(true);
        return (Channel)field.get(publisher);
    }

    private static class ReplayedSimpleAsyncHandler
    extends ReactiveStreamsTest.SimpleStreamedAsyncHandler
    implements AsyncHandlerExtensions {
        private final CountDownLatch replaying;

        public ReplayedSimpleAsyncHandler(CountDownLatch replaying, ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> subscriber) {
            super(subscriber);
            this.replaying = replaying;
        }

        public void onHostnameResolutionAttempt(String name) {
        }

        public void onHostnameResolutionSuccess(String name, List<InetSocketAddress> addresses) {
        }

        public void onHostnameResolutionFailure(String name, Throwable cause) {
        }

        public void onTcpConnectAttempt(InetSocketAddress address) {
        }

        public void onTcpConnectSuccess(InetSocketAddress address, Channel connection) {
        }

        public void onTcpConnectFailure(InetSocketAddress address, Throwable cause) {
        }

        public void onTlsHandshakeAttempt() {
        }

        public void onTlsHandshakeSuccess() {
        }

        public void onTlsHandshakeFailure(Throwable cause) {
        }

        public void onConnectionPoolAttempt() {
        }

        public void onConnectionPooled(Channel connection) {
        }

        public void onConnectionOffer(Channel connection) {
        }

        public void onRequestSend(NettyRequest request) {
        }

        public void onRetry() {
            this.replaying.countDown();
        }
    }

    private static class BlockedStreamSubscriber
    extends ReactiveStreamsTest.SimpleSubscriber<HttpResponseBodyPart> {
        private static final Logger LOGGER = LoggerFactory.getLogger(BlockedStreamSubscriber.class);
        private final CountDownLatch streamStarted;
        private final CountDownLatch streamOnHold;

        public BlockedStreamSubscriber(CountDownLatch streamStarted, CountDownLatch streamOnHold) {
            this.streamStarted = streamStarted;
            this.streamOnHold = streamOnHold;
        }

        @Override
        public void onNext(HttpResponseBodyPart t) {
            this.streamStarted.countDown();
            try {
                this.streamOnHold.await();
            }
            catch (InterruptedException e) {
                LOGGER.error("`streamOnHold` latch was interrupted", (Throwable)e);
            }
            super.onNext(t);
        }
    }
}

