/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public class TestTpchDistributedConcurrent
extends BaseTestQuery {
    @Rule
    public final TestRule TIMEOUT = TestTools.getTimeoutRule((int)120000);
    private static final String[] queryFile = new String[]{"queries/tpch/01.sql", "queries/tpch/03.sql", "queries/tpch/04.sql", "queries/tpch/05.sql", "queries/tpch/06.sql", "queries/tpch/07.sql", "queries/tpch/08.sql", "queries/tpch/09.sql", "queries/tpch/10.sql", "queries/tpch/11.sql", "queries/tpch/12.sql", "queries/tpch/13.sql", "queries/tpch/14.sql", "queries/tpch/16.sql", "queries/tpch/18.sql", "queries/tpch/19_1.sql", "queries/tpch/20.sql"};
    private static final int TOTAL_QUERIES = 115;
    private static final int CONCURRENT_QUERIES = 15;
    private static final Random random = new Random(-559038737L);
    private static final String alterSession = "alter session set `planner.slice_target` = 10";
    private int remainingQueries = 100;
    private final Semaphore completionSemaphore = new Semaphore(0);
    private final Semaphore submissionSemaphore = new Semaphore(0);
    private final Set<UserResultsListener> listeners = Sets.newIdentityHashSet();
    private Thread testThread = null;
    private final List<FailedQuery> failedQueries = new LinkedList<FailedQuery>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitRandomQuery() {
        String query;
        String filename = queryFile[random.nextInt(queryFile.length)];
        try {
            query = QueryTestUtil.normalizeQuery(TestTpchDistributedConcurrent.getFile(filename)).replace(';', ' ');
        }
        catch (IOException e) {
            throw new RuntimeException("Caught exception", e);
        }
        ChainingSilentListener listener = new ChainingSilentListener(query);
        client.runQuery(UserBitShared.QueryType.SQL, query, (UserResultsListener)listener);
        TestTpchDistributedConcurrent testTpchDistributedConcurrent = this;
        synchronized (testTpchDistributedConcurrent) {
            this.listeners.add(listener);
        }
    }

    @Test
    public void testConcurrentQueries() throws Exception {
        QueryTestUtil.testRunAndPrint(client, UserBitShared.QueryType.SQL, alterSession);
        this.testThread = Thread.currentThread();
        QuerySubmitter querySubmitter = new QuerySubmitter();
        querySubmitter.start();
        this.submissionSemaphore.release(15);
        InterruptedException interruptedException = null;
        try {
            this.completionSemaphore.acquire(115);
        }
        catch (InterruptedException e) {
            interruptedException = e;
            for (FailedQuery fq : this.failedQueries) {
                System.err.println(String.format("%s failed with %s", new Object[]{fq.queryFile, fq.userEx}));
            }
        }
        querySubmitter.interrupt();
        Assert.assertNull((String)"Query error caused interruption", (Object)interruptedException);
        int nListeners = this.listeners.size();
        Assert.assertEquals((String)(nListeners + " listeners still exist"), (long)0L, (long)nListeners);
        Assert.assertEquals((String)"Didn't submit all queries", (long)0L, (long)this.remainingQueries);
        Assert.assertEquals((String)"Queries failed", (long)0L, (long)this.failedQueries.size());
    }

    private class QuerySubmitter
    extends Thread {
        private QuerySubmitter() {
        }

        @Override
        public void run() {
            while (true) {
                try {
                    TestTpchDistributedConcurrent.this.submissionSemaphore.acquire();
                }
                catch (InterruptedException e) {
                    System.out.println("QuerySubmitter quitting.");
                    return;
                }
                TestTpchDistributedConcurrent.this.submitRandomQuery();
            }
        }
    }

    private class ChainingSilentListener
    extends BaseTestQuery.SilentListener {
        private final String query;

        public ChainingSilentListener(String query) {
            this.query = query;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
            super.queryCompleted(state);
            TestTpchDistributedConcurrent.this.completionSemaphore.release();
            TestTpchDistributedConcurrent testTpchDistributedConcurrent = TestTpchDistributedConcurrent.this;
            synchronized (testTpchDistributedConcurrent) {
                Boolean object = TestTpchDistributedConcurrent.this.listeners.remove(this);
                Assert.assertNotNull((String)"listener not found", (Object)object);
                if (TestTpchDistributedConcurrent.this.failedQueries.size() == 0 && TestTpchDistributedConcurrent.this.remainingQueries > 0) {
                    TestTpchDistributedConcurrent.this.submissionSemaphore.release();
                    --TestTpchDistributedConcurrent.this.remainingQueries;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void submissionFailed(UserException uex) {
            super.submissionFailed(uex);
            TestTpchDistributedConcurrent.this.completionSemaphore.release();
            System.out.println("submissionFailed for " + this.query + "\nwith " + (Object)((Object)uex));
            TestTpchDistributedConcurrent testTpchDistributedConcurrent = TestTpchDistributedConcurrent.this;
            synchronized (testTpchDistributedConcurrent) {
                Boolean object = TestTpchDistributedConcurrent.this.listeners.remove(this);
                Assert.assertNotNull((String)"listener not found", (Object)object);
                TestTpchDistributedConcurrent.this.failedQueries.add(new FailedQuery(this.query, uex));
                TestTpchDistributedConcurrent.this.testThread.interrupt();
            }
        }
    }

    private static class FailedQuery {
        final String queryFile;
        final UserException userEx;

        public FailedQuery(String queryFile, UserException userEx) {
            this.queryFile = queryFile;
            this.userEx = userEx;
        }
    }
}

