/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.rest;

import io.confluent.rest.Application;
import io.confluent.rest.RestConfig;
import io.confluent.rest.TestRestConfig;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Configurable;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.test.TestUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCustomizeThreadPool {
    private static final Logger log = LoggerFactory.getLogger(TestCustomizeThreadPool.class);
    private static Object locker = new Object();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Test
    public void testThreadPoolLessThreshold() throws Exception {
        int numOfClients = 3;
        TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication();
        String uri = app.getUri();
        try {
            app.start();
            this.makeConcurrentGetRequests(uri + "/custom/resource", numOfClients, app);
        }
        catch (Exception exception) {
            log.info("Current running thread {}, maximum thread {}.", (Object)app.getServer().getThreads(), (Object)app.getServer().getMaxThreads());
            Assertions.assertTrue((app.getServer().getThreads() - app.getServer().getMaxThreads() < 0 ? 1 : 0) != 0, (String)("Total number of running threads is less than maximum number of threads " + app.getServer().getMaxThreads()));
            log.info("Total jobs in queue {}, capacity of queue {}.", (Object)app.getServer().getQueueSize(), (Object)app.getServer().getQueueCapacity());
            Assertions.assertTrue((app.getServer().getQueueSize() - app.getServer().getQueueCapacity() < 0 ? 1 : 0) != 0, (String)("Total number of jobs in queue is less than capacity of queue " + app.getServer().getQueueCapacity()));
            app.stop();
            catch (Throwable throwable) {
                log.info("Current running thread {}, maximum thread {}.", (Object)app.getServer().getThreads(), (Object)app.getServer().getMaxThreads());
                Assertions.assertTrue((app.getServer().getThreads() - app.getServer().getMaxThreads() < 0 ? 1 : 0) != 0, (String)("Total number of running threads is less than maximum number of threads " + app.getServer().getMaxThreads()));
                log.info("Total jobs in queue {}, capacity of queue {}.", (Object)app.getServer().getQueueSize(), (Object)app.getServer().getQueueCapacity());
                Assertions.assertTrue((app.getServer().getQueueSize() - app.getServer().getQueueCapacity() < 0 ? 1 : 0) != 0, (String)("Total number of jobs in queue is less than capacity of queue " + app.getServer().getQueueCapacity()));
                app.stop();
                throw throwable;
            }
        }
        log.info("Current running thread {}, maximum thread {}.", (Object)app.getServer().getThreads(), (Object)app.getServer().getMaxThreads());
        Assertions.assertTrue((app.getServer().getThreads() - app.getServer().getMaxThreads() < 0 ? 1 : 0) != 0, (String)("Total number of running threads is less than maximum number of threads " + app.getServer().getMaxThreads()));
        log.info("Total jobs in queue {}, capacity of queue {}.", (Object)app.getServer().getQueueSize(), (Object)app.getServer().getQueueCapacity());
        Assertions.assertTrue((app.getServer().getQueueSize() - app.getServer().getQueueCapacity() < 0 ? 1 : 0) != 0, (String)("Total number of jobs in queue is less than capacity of queue " + app.getServer().getQueueCapacity()));
        app.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThreadPoolReachThreshold() throws Exception {
        int numOfClients = 40;
        TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication();
        String uri = app.getUri();
        try {
            app.start();
            this.makeConcurrentGetRequests(uri + "/custom/resource", numOfClients, app);
        }
        catch (Exception exception) {
        }
        finally {
            log.info("Current running thread {}, maximum thread {}.", (Object)app.getServer().getThreads(), (Object)app.getServer().getMaxThreads());
            Assertions.assertEquals((int)app.getServer().getMaxThreads(), (int)app.getServer().getThreads(), (String)"Total number of running threads reach maximum number of threads.");
            app.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testQueueFull() throws Exception {
        int numOfClients = 1;
        Properties props = new Properties();
        props.put("listeners", "http://localhost:8080");
        props.put("thread.pool.min", "2");
        props.put("thread.pool.max", "10");
        props.put("request.queue.capacity.init", "0");
        props.put("request.queue.capacity", "0");
        props.put("request.queue.capacity.growby", "2");
        TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication(props);
        String uri = app.getUri();
        try {
            app.start();
            Assertions.assertThrows(RejectedExecutionException.class, () -> this.makeConcurrentGetRequests(uri + "/custom/resource", numOfClients, app));
        }
        finally {
            app.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Disabled
    public void testJettyThreadPoolMetrics() throws Exception {
        RestResource.latch = new CountDownLatch(1);
        TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication();
        String uri = app.getUri();
        try {
            app.start();
            Assertions.assertEquals((int)0, (int)TestCustomizeThreadPool.getIntMetricValue(app.metrics, "request-queue-size"));
            int numThread = 18;
            Thread[] threads = this.sendRequests(uri + "/custom/resource", numThread);
            TestUtils.waitForCondition(() -> app.server.getQueueSize() == 8, (String)"Queue is not full");
            Assertions.assertEquals((int)8, (int)TestCustomizeThreadPool.getIntMetricValue(app.metrics, "request-queue-size"));
            Assertions.assertEquals((int)10, (int)TestCustomizeThreadPool.getIntMetricValue(app.metrics, "busy-thread-count"));
            Assertions.assertEquals((double)1.0, (double)TestCustomizeThreadPool.getDoubleMetricValue(app.metrics, "thread-pool-usage"), (double)0.0);
            RestResource.latch.countDown();
            for (int i = 0; i < numThread; ++i) {
                threads[i].join();
            }
            TestUtils.waitForCondition(() -> app.server.getQueueSize() == 0, (String)"Queue is not empty");
            Assertions.assertEquals((int)0, (int)TestCustomizeThreadPool.getIntMetricValue(app.metrics, "request-queue-size"));
            Assertions.assertTrue((TestCustomizeThreadPool.getDoubleMetricValue(app.metrics, "thread-pool-usage") > 0.0 ? 1 : 0) != 0);
            Assertions.assertTrue((TestCustomizeThreadPool.getDoubleMetricValue(app.metrics, "thread-pool-usage") < 1.0 ? 1 : 0) != 0);
        }
        finally {
            RestResource.latch = null;
            app.stop();
        }
    }

    public static int getIntMetricValue(Metrics metrics, String attribute) {
        Map allMetrics = metrics.metrics();
        Optional<KafkaMetric> metric = allMetrics.entrySet().stream().filter(m -> ((MetricName)m.getKey()).name().equals(attribute)).map(Map.Entry::getValue).findFirst();
        return metric.isPresent() ? (Integer)metric.get().metricValue() : -1;
    }

    public static double getDoubleMetricValue(Metrics metrics, String attribute) {
        Map allMetrics = metrics.metrics();
        Optional<KafkaMetric> metric = allMetrics.entrySet().stream().filter(m -> ((MetricName)m.getKey()).name().equals(attribute)).map(Map.Entry::getValue).findFirst();
        return metric.isPresent() ? (Double)metric.get().metricValue() : -1.0;
    }

    private void makeConcurrentGetRequests(String uri, int numThread, TestCustomizeThreadPoolApplication app) throws Exception {
        Thread[] threads = this.sendRequests(uri, numThread);
        long startingTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startingTime < 360000L) {
            log.info("Queue size {}, queue capacity {} ", (Object)app.getServer().getQueueSize(), (Object)app.getServer().getQueueCapacity());
            Assertions.assertTrue((app.getServer().getQueueSize() <= app.getServer().getQueueCapacity() ? 1 : 0) != 0, (String)"Number of jobs in queue is not more than capacity of queue ");
            Thread.sleep(2000L);
            if (app.getServer().getQueueSize() != 0) continue;
        }
        for (int i = 0; i < numThread; ++i) {
            threads[i].join();
        }
        log.info("End queue size {}, queue capacity {} ", (Object)app.getServer().getQueueSize(), (Object)app.getServer().getQueueCapacity());
    }

    private Thread[] sendRequests(final String uri, int numThread) {
        Thread[] threads = new Thread[numThread];
        for (int i = 0; i < numThread; ++i) {
            threads[i] = new Thread(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    HttpGet httpget = new HttpGet(uri);
                    CloseableHttpClient httpclient = HttpClients.createDefault();
                    CloseableHttpResponse response = null;
                    try {
                        response = httpclient.execute((HttpUriRequest)httpget);
                        HttpStatus.Code statusCode = HttpStatus.getCode((int)response.getStatusLine().getStatusCode());
                        log.info("Status code {}, reason {} ", (Object)statusCode, (Object)response.getStatusLine().getReasonPhrase());
                        MatcherAssert.assertThat((Object)statusCode, (Matcher)CoreMatchers.is((Object)HttpStatus.Code.OK));
                    }
                    catch (Exception exception) {
                    }
                    finally {
                        try {
                            if (response != null) {
                                response.close();
                            }
                            httpclient.close();
                        }
                        catch (Exception exception) {}
                    }
                }
            };
            threads[i].start();
        }
        return threads;
    }

    private static class TestCustomizeThreadPoolApplication
    extends Application<TestRestConfig> {
        static Properties props = null;

        public TestCustomizeThreadPoolApplication() {
            super((RestConfig)TestCustomizeThreadPoolApplication.createConfig());
        }

        public TestCustomizeThreadPoolApplication(Properties props) {
            super((RestConfig)TestRestConfig.maprCompatible(props));
            TestCustomizeThreadPoolApplication.props = props;
        }

        public void setupResources(Configurable<?> config, TestRestConfig appConfig) {
            config.register((Object)new RestResource());
        }

        public String getUri() {
            return (String)props.get("listeners");
        }

        private static TestRestConfig createConfig() {
            props = new Properties();
            String uri = "http://localhost:8080";
            props.put("listeners", uri);
            props.put("thread.pool.min", "2");
            props.put("thread.pool.max", "10");
            props.put("request.queue.capacity.init", "2");
            props.put("request.queue.capacity", "8");
            props.put("request.queue.capacity.growby", "2");
            return TestRestConfig.maprCompatible(props);
        }
    }

    @Path(value="/custom")
    @Produces(value={"text/plain"})
    public static class RestResource {
        static CountDownLatch latch = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @GET
        @Path(value="/resource")
        public String get() throws InterruptedException {
            if (latch == null) {
                Object object = locker;
                synchronized (object) {
                    try {
                        locker.wait(10000L);
                    }
                    catch (Exception e) {
                        log.info(e.getMessage());
                    }
                }
            }
            latch.await();
            return "ThreadPool";
        }
    }
}

