/*
 * Decompiled with CFR 0.152.
 */
package org.asynchttpclient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.test.TestUtils;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class RC1KTest
extends AbstractBasicTest {
    private static final int C1K = 1000;
    private static final String ARG_HEADER = "Arg";
    private static final int SRV_COUNT = 10;
    protected Server[] servers = new Server[10];
    private int[] ports = new int[10];

    @Override
    @BeforeClass(alwaysRun=true)
    public void setUpGlobal() throws Exception {
        this.ports = new int[10];
        for (int i = 0; i < 10; ++i) {
            Server server = new Server();
            ServerConnector connector = TestUtils.addHttpConnector(server);
            server.setHandler((Handler)this.configureHandler());
            server.start();
            this.servers[i] = server;
            this.ports[i] = connector.getLocalPort();
        }
        this.logger.info("Local HTTP servers started successfully");
    }

    @Override
    @AfterClass(alwaysRun=true)
    public void tearDownGlobal() throws Exception {
        for (Server srv : this.servers) {
            srv.stop();
        }
    }

    @Override
    public AbstractHandler configureHandler() throws Exception {
        return new AbstractHandler(){

            public void handle(String s, Request r, HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException {
                resp.setContentType("text/pain");
                String arg = s.substring(1);
                resp.setHeader(RC1KTest.ARG_HEADER, arg);
                resp.setStatus(200);
                resp.getOutputStream().print(arg);
                resp.getOutputStream().flush();
                resp.getOutputStream().close();
            }
        };
    }

    @Test(timeOut=600000L, groups={"scalability"})
    public void rc10kProblem() throws IOException, ExecutionException, TimeoutException, InterruptedException {
        try (AsyncHttpClient ahc = Dsl.asyncHttpClient((DefaultAsyncHttpClientConfig.Builder)Dsl.config().setMaxConnectionsPerHost(1000).setKeepAlive(true));){
            ArrayList<ListenableFuture> resps = new ArrayList<ListenableFuture>(1000);
            int i = 0;
            while (i < 1000) {
                resps.add(ahc.prepareGet(String.format("http://localhost:%d/%d", this.ports[i % 10], i)).execute((AsyncHandler)new MyAsyncHandler(i++)));
            }
            i = 0;
            for (Future future : resps) {
                Integer resp = (Integer)future.get();
                Assert.assertNotNull((Object)resp);
                Assert.assertEquals((int)resp, (int)i++);
            }
        }
    }

    private class MyAsyncHandler
    implements AsyncHandler<Integer> {
        private String arg;
        private AtomicInteger result = new AtomicInteger(-1);

        public MyAsyncHandler(int i) {
            this.arg = String.format("%d", i);
        }

        public void onThrowable(Throwable t) {
            RC1KTest.this.logger.warn("onThrowable called.", t);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart event) throws Exception {
            String s = new String(event.getBodyPartBytes());
            this.result.compareAndSet(-1, new Integer(s.trim().equals("") ? "-1" : s));
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus event) throws Exception {
            Assert.assertEquals((int)event.getStatusCode(), (int)200);
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders event) throws Exception {
            Assert.assertEquals((String)event.getHeaders().get(RC1KTest.ARG_HEADER), (String)this.arg);
            return AsyncHandler.State.CONTINUE;
        }

        public Integer onCompleted() throws Exception {
            return this.result.get();
        }
    }
}

