/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark.session;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionImpl;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkConf;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSparkSessionManagerImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TestSparkSessionManagerImpl.class);
    private SparkSessionManagerImpl sessionManagerHS2 = null;
    private boolean anyFailedSessionThread;

    @Test
    public void testSingleSessionMultipleUse() throws Exception {
        HiveConf conf = new HiveConf();
        conf.set("spark.master", "local");
        SparkSessionManagerImpl sessionManager = SparkSessionManagerImpl.getInstance();
        SparkSession sparkSession1 = sessionManager.getSession(null, conf, true);
        Assert.assertTrue((boolean)sparkSession1.isOpen());
        SparkSession sparkSession2 = sessionManager.getSession(sparkSession1, conf, true);
        Assert.assertTrue((sparkSession1 == sparkSession2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)sparkSession2.isOpen());
        sessionManager.shutdown();
        sessionManager.closeSession(sparkSession1);
    }

    @Test
    public void testMultiSessionMultipleUse() throws Exception {
        this.sessionManagerHS2 = SparkSessionManagerImpl.getInstance();
        this.sessionManagerHS2.shutdown();
        HiveConf hiveConf = new HiveConf();
        hiveConf.set("spark.master", "local");
        this.sessionManagerHS2.setup(hiveConf);
        ArrayList<Thread> threadList = new ArrayList<Thread>();
        for (int i = 0; i < 10; ++i) {
            Thread t = new Thread((Runnable)new SessionThread(), "Session thread " + i);
            t.start();
            threadList.add(t);
        }
        for (Thread t : threadList) {
            try {
                t.join();
            }
            catch (InterruptedException e) {
                String msg = "Interrupted while waiting for test session threads.";
                LOG.error(msg, (Throwable)e);
                Assert.fail((String)msg);
            }
        }
        Assert.assertFalse((String)"At least one of the session threads failed. See the test output for details.", (boolean)this.anyFailedSessionThread);
        System.out.println("Ending SessionManagerHS2");
        this.sessionManagerHS2.shutdown();
    }

    @Test
    public void testForceConfCloning() throws Exception {
        HiveConf conf = new HiveConf();
        conf.set("spark.master", "local");
        String sparkCloneConfiguration = "spark.hadoop.cloneConf";
        conf.unset(sparkCloneConfiguration);
        Assert.assertNull((String)("Could not clear " + sparkCloneConfiguration + " in HiveConf"), (Object)conf.get(sparkCloneConfiguration));
        this.checkSparkConf(conf, sparkCloneConfiguration, "true");
        conf.set(sparkCloneConfiguration, "false");
        this.checkSparkConf(conf, sparkCloneConfiguration, "false");
        conf.set(sparkCloneConfiguration, "true");
        this.checkSparkConf(conf, sparkCloneConfiguration, "true");
    }

    @Test
    public void testGetHiveException() throws Exception {
        HiveConf conf = new HiveConf();
        conf.set("spark.master", "local");
        SparkSessionManagerImpl ssm = SparkSessionManagerImpl.getInstance();
        SparkSessionImpl ss = (SparkSessionImpl)ssm.getSession(null, conf, true);
        Exception e = new TimeoutException();
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
        e = new InterruptedException();
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED);
        e = new RuntimeException("\t diagnostics: Application application_1508358311878_3322732 failed 1 times due to ApplicationMaster for attempt appattempt_1508358311878_3322732_000001 timed out. Failing the application.");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
        e = new RuntimeException("\t diagnostics: Application application_1508358311878_3330000 submitted by user hive to unknown queue: foo");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, "submitted by user hive to unknown queue: foo");
        e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException: Queue root.foo is STOPPED. Cannot accept submission of application: application_1508358311878_3369187");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, "Queue root.foo is STOPPED");
        e = new RuntimeException("\t diagnostics: org.apache.hadoop.security.AccessControlException: Queue root.foo already has 10 applications, cannot accept submission of application: application_1508358311878_3384544");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, "Queue root.foo already has 10 applications");
        e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException: Required executor memory (7168+10240 MB) is above the max threshold (16384 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, "Required executor memory (7168+10240 MB) is above the max threshold (16384 MB)");
        e = new RuntimeException("Exception in thread \"\"main\"\" java.lang.IllegalArgumentException: requirement failed: initial executor number 5 must between min executor number10 and max executor number 50");
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, "initial executor number 5 must between min executor number10 and max executor number 50");
        e = new Exception();
        this.checkHiveException(ss, e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR);
    }

    private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg) {
        this.checkHiveException(ss, e, expectedErrMsg, null);
    }

    private void checkHiveException(SparkSessionImpl ss, Throwable e, ErrorMsg expectedErrMsg, String expectedMatchedStr) {
        HiveException he = ss.getHiveException(e);
        Assert.assertEquals((Object)expectedErrMsg, (Object)he.getCanonicalErrorMsg());
        if (expectedMatchedStr != null) {
            Assert.assertEquals((Object)expectedMatchedStr, (Object)ss.getMatchedString());
        }
    }

    private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException {
        SparkSessionManagerImpl sessionManager = SparkSessionManagerImpl.getInstance();
        SparkSessionImpl sparkSessionImpl = (SparkSessionImpl)sessionManager.getSession(null, conf, true);
        Assert.assertTrue((boolean)sparkSessionImpl.isOpen());
        HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient();
        SparkConf sparkConf = hiveSparkClient.getSparkConf();
        String cloneConfig = sparkConf.get(paramName);
        sessionManager.closeSession((SparkSession)sparkSessionImpl);
        Assert.assertEquals((Object)expectedValue, (Object)cloneConfig);
        sessionManager.shutdown();
    }

    public class SessionThread
    implements Runnable {
        @Override
        public void run() {
            try {
                Random random = new Random(Thread.currentThread().getId());
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + " started.");
                HiveConf conf = new HiveConf();
                conf.set("spark.master", "local");
                SparkSession prevSession = null;
                SparkSession currentSession = null;
                for (int i = 0; i < 5; ++i) {
                    currentSession = TestSparkSessionManagerImpl.this.sessionManagerHS2.getSession(prevSession, conf, true);
                    Assert.assertTrue((prevSession == null || prevSession == currentSession ? 1 : 0) != 0);
                    Assert.assertTrue((boolean)currentSession.isOpen());
                    System.out.println(String.format("%s got session (%d): %s", threadName, i, currentSession.getSessionId()));
                    Thread.sleep((random.nextInt(3) + 1) * 1000);
                    TestSparkSessionManagerImpl.this.sessionManagerHS2.returnSession(currentSession);
                    prevSession = currentSession;
                }
                TestSparkSessionManagerImpl.this.sessionManagerHS2.closeSession(currentSession);
                System.out.println(threadName + " ended.");
            }
            catch (Throwable e) {
                TestSparkSessionManagerImpl.this.anyFailedSessionThread = true;
                String msg = String.format("Error executing '%s'", Thread.currentThread().getName());
                LOG.error(msg, e);
                Assert.fail((String)(msg + " " + StringUtils.stringifyException((Throwable)e)));
            }
        }
    }
}

