/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import hive.com.google.common.annotations.VisibleForTesting;
import hive.com.google.common.collect.Lists;
import hive.com.google.common.collect.Sets;
import hive.com.google.common.math.DoubleMath;
import hive.com.google.common.util.concurrent.FutureCallback;
import hive.com.google.common.util.concurrent.Futures;
import hive.com.google.common.util.concurrent.ListenableFuture;
import hive.com.google.common.util.concurrent.MoreExecutors;
import hive.com.google.common.util.concurrent.SettableFuture;
import hive.com.google.common.util.concurrent.ThreadFactoryBuilder;
import hive.org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode;
import org.apache.hadoop.hive.ql.exec.tez.GuaranteedTasksAllocator;
import org.apache.hadoop.hive.ql.exec.tez.KillMoveTriggerActionHandler;
import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClientImpl;
import org.apache.hadoop.hive.ql.exec.tez.PerPoolTriggerValidatorRunnable;
import org.apache.hadoop.hive.ql.exec.tez.QueryAllocationManager;
import org.apache.hadoop.hive.ql.exec.tez.RestrictedConfigChecker;
import org.apache.hadoop.hive.ql.exec.tez.SessionExpirationTracker;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPool;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping;
import org.apache.hadoop.hive.ql.exec.tez.WmEvent;
import org.apache.hadoop.hive.ql.exec.tez.WmPoolMetrics;
import org.apache.hadoop.hive.ql.exec.tez.WmTezSession;
import org.apache.hadoop.hive.ql.exec.tez.WorkloadManagerMxBean;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hive.com.fasterxml.jackson.annotation.JsonAutoDetect;
import org.apache.hive.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.hive.com.fasterxml.jackson.databind.introspect.VisibilityChecker;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkloadManager
extends TezSessionPoolSession.AbstractTriggerValidator
implements TezSessionPoolSession.Manager,
SessionExpirationTracker.RestartImpl,
WorkloadManagerMxBean {
    private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
    private static final char POOL_SEPARATOR = '.';
    private static final String POOL_SEPARATOR_STR = ".";
    private final ObjectMapper objectMapper;
    private final HiveConf conf;
    private final TezSessionPool<WmTezSession> tezAmPool;
    private final SessionExpirationTracker expirationTracker;
    private final RestrictedConfigChecker restrictedConfig;
    private final QueryAllocationManager allocationManager;
    private final String yarnQueue;
    private final int amRegistryTimeoutMs;
    private final boolean allowAnyPool;
    private final MetricsSystem metricsSystem;
    private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions = new IdentityHashMap();
    private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
    private Map<String, PoolState> pools;
    private String rpName;
    private String defaultPool;
    private int totalQueryParallelism;
    private Map<WmTezSession, KillQueryContext> killQueryInProgress = new IdentityHashMap<WmTezSession, KillQueryContext>();
    private UserPoolMapping userPoolMapping;
    private final ReentrantLock currentLock = new ReentrantLock();
    private final Condition hasChangesCondition = this.currentLock.newCondition();
    private final EventState one = new EventState();
    private final EventState two = new EventState();
    private boolean hasChanges = false;
    private EventState current = this.one;
    private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
    private PerPoolTriggerValidatorRunnable triggerValidatorRunnable;
    private Map<String, SessionTriggerProvider> perPoolProviders = new ConcurrentHashMap<String, SessionTriggerProvider>();
    @VisibleForTesting
    protected final Thread wmThread;
    private final ExecutorService workPool;
    private final ScheduledExecutorService timeoutPool;
    private LlapPluginEndpointClientImpl amComm;
    private static final FutureCallback<Object> FATAL_ERROR_CALLBACK = new FutureCallback<Object>(){

        @Override
        public void onSuccess(Object result) {
        }

        @Override
        public void onFailure(Throwable t) {
            LOG.error("Workload management fatal error", t);
        }
    };
    private static volatile WorkloadManager INSTANCE;

    public static WorkloadManager getInstance() {
        return INSTANCE;
    }

    public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) throws ExecutionException, InterruptedException {
        assert (INSTANCE == null);
        LlapPluginEndpointClientImpl amComm = new LlapPluginEndpointClientImpl(conf, null, -1);
        GuaranteedTasksAllocator qam = new GuaranteedTasksAllocator(conf, amComm);
        INSTANCE = new WorkloadManager(amComm, yarnQueue, conf, qam, plan);
        return INSTANCE;
    }

    @VisibleForTesting
    WorkloadManager(LlapPluginEndpointClientImpl amComm, String yarnQueue, HiveConf conf, QueryAllocationManager qam, WMFullResourcePlan plan) throws ExecutionException, InterruptedException {
        this.yarnQueue = yarnQueue;
        this.conf = conf;
        this.totalQueryParallelism = WorkloadManager.determineQueryParallelism(plan);
        this.allocationManager = qam;
        this.allocationManager.setClusterChangedCallback(() -> this.notifyOfClusterStateChange());
        this.amComm = amComm;
        if (this.amComm != null) {
            this.amComm.init(conf);
        }
        LOG.info("Initializing with " + this.totalQueryParallelism + " total query parallelism");
        this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS);
        this.tezAmPool = new TezSessionPool<WmTezSession>(conf, this.totalQueryParallelism, true, oldSession -> this.createSession(oldSession == null ? null : oldSession.getConf()));
        this.restrictedConfig = new RestrictedConfigChecker(conf);
        this.expirationTracker = SessionExpirationTracker.create(conf, this);
        this.workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_WM_WORKER_THREADS), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build());
        this.timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management timeout thread").build());
        this.allowAnyPool = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
        this.metricsSystem = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_WM_POOL_METRICS) ? DefaultMetricsSystem.instance() : null;
        this.wmThread = new Thread(() -> this.runWmThread(), "Workload management master");
        this.wmThread.setDaemon(true);
        this.wmThread.start();
        this.updateResourcePlanAsync(plan).get();
        this.objectMapper = new ObjectMapper();
        this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        this.objectMapper.setVisibility((VisibilityChecker<?>)this.objectMapper.getSerializationConfig().getDefaultVisibilityChecker().withGetterVisibility(JsonAutoDetect.Visibility.NONE).withSetterVisibility(JsonAutoDetect.Visibility.NONE));
    }

    private static int determineQueryParallelism(WMFullResourcePlan plan) {
        if (plan == null) {
            return 0;
        }
        int result = 0;
        for (WMPool pool : plan.getPools()) {
            result += pool.getQueryParallelism();
        }
        return result;
    }

    public void start() throws Exception {
        this.initTriggers();
        this.tezAmPool.start();
        if (this.expirationTracker != null) {
            this.expirationTracker.start();
        }
        if (this.amComm != null) {
            this.amComm.start();
        }
        this.allocationManager.start();
    }

    private void initTriggers() {
        if (this.triggerValidatorRunnable == null) {
            long triggerValidationIntervalMs = HiveConf.getTimeVar(this.conf, HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL, TimeUnit.MILLISECONDS);
            KillMoveTriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
            this.triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(this.perPoolProviders, triggerActionHandler, triggerValidationIntervalMs);
            this.startTriggerValidator(triggerValidationIntervalMs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        ArrayList<TezSessionPoolSession> sessionsToClose = null;
        IdentityHashMap<TezSessionPoolSession, Boolean> identityHashMap = this.openSessions;
        synchronized (identityHashMap) {
            sessionsToClose = new ArrayList<TezSessionPoolSession>(this.openSessions.keySet());
        }
        for (TezSessionState tezSessionState : sessionsToClose) {
            tezSessionState.close(false);
        }
        if (this.expirationTracker != null) {
            this.expirationTracker.stop();
        }
        this.allocationManager.stop();
        if (this.wmThread != null) {
            this.wmThread.interrupt();
        }
        if (this.amComm != null) {
            this.amComm.stop();
        }
        this.workPool.shutdownNow();
        this.timeoutPool.shutdownNow();
        if (this.triggerValidatorRunnable != null) {
            this.stopTriggerValidator();
        }
        INSTANCE = null;
    }

    private void updateSessionTriggerProvidersOnMasterThread() {
        for (Map.Entry<String, PoolState> entry : this.pools.entrySet()) {
            String poolName = entry.getKey();
            PoolState poolState = entry.getValue();
            List<Trigger> triggers = Collections.unmodifiableList(poolState.getTriggers());
            List<TezSessionState> sessionStates = Collections.unmodifiableList(poolState.getSessions());
            SessionTriggerProvider sessionTriggerProvider = this.perPoolProviders.get(poolName);
            if (sessionTriggerProvider != null) {
                this.perPoolProviders.get(poolName).setTriggers(triggers);
                this.perPoolProviders.get(poolName).setSessions(sessionStates);
                continue;
            }
            this.perPoolProviders.put(poolName, new SessionTriggerProvider(sessionStates, triggers));
        }
    }

    @VisibleForTesting
    Map<String, SessionTriggerProvider> getAllSessionTriggerProviders() {
        return this.perPoolProviders;
    }

    private void runWmThread() {
        while (true) {
            EventState currentEvents = null;
            this.currentLock.lock();
            try {
                while (!this.hasChanges) {
                    try {
                        this.hasChangesCondition.await(1L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        LOG.warn("WM thread was interrupted and will now exit");
                        this.currentLock.unlock();
                        return;
                    }
                }
                this.hasChanges = false;
                currentEvents = this.current;
                this.current = currentEvents == this.one ? this.two : this.one;
            }
            finally {
                this.currentLock.unlock();
            }
            try {
                LOG.info("Processing current events");
                this.processCurrentEvents(currentEvents, this.syncWork);
                this.scheduleWork(this.syncWork);
                this.updateSessionTriggerProvidersOnMasterThread();
                continue;
            }
            catch (InterruptedException ex) {
                LOG.warn("WM thread was interrupted and will now exit");
                return;
            }
            catch (AssertionError | Exception ex) {
                LOG.error("WM thread encountered an error but will attempt to continue", (Throwable)ex);
                if (currentEvents.testEvent != null) {
                    currentEvents.testEvent.setException((Throwable)ex);
                    currentEvents.testEvent = null;
                }
                if (currentEvents.applyRpFuture == null) continue;
                currentEvents.applyRpFuture.setException((Throwable)ex);
                currentEvents.applyRpFuture = null;
                continue;
            }
            break;
        }
    }

    private void scheduleWork(WmThreadSyncWork context) {
        for (KillQueryContext killCtx : context.toKillQuery.values()) {
            WmTezSession toKill = killCtx.session;
            String reason = killCtx.reason;
            LOG.info("Killing query for {}", (Object)toKill);
            this.workPool.submit(() -> {
                block7: {
                    String queryId = toKill.getQueryId();
                    KillQuery kq = toKill.getKillQuery();
                    if (kq != null && queryId != null) {
                        WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL);
                        LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
                        try {
                            kq.killQuery(queryId, reason);
                            this.addKillQueryResult(toKill, true);
                            killCtx.killSessionFuture.set(true);
                            wmEvent.endEvent(toKill);
                            LOG.debug("Killed " + queryId);
                            return;
                        }
                        catch (HiveException ex) {
                            LOG.error("Failed to kill " + queryId + "; will try to restart AM instead", (Throwable)ex);
                            break block7;
                        }
                    }
                    LOG.info("Will queue restart for {}; queryId {}, killQuery {}", new Object[]{toKill, queryId, kq});
                    break block7;
                    finally {
                        toKill.setQueryId(null);
                    }
                }
                this.addKillQueryResult(toKill, false);
            });
        }
        context.toKillQuery.clear();
        for (WmTezSession toRestart : context.toRestartInUse) {
            LOG.info("Replacing {} with a new session", (Object)toRestart);
            toRestart.setQueryId(null);
            this.workPool.submit(() -> {
                try {
                    WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART);
                    this.tezAmPool.replaceSession(toRestart);
                    wmEvent.endEvent(toRestart);
                }
                catch (Exception ex) {
                    LOG.error("Failed to restart an old session; ignoring", (Throwable)ex);
                }
            });
        }
        context.toRestartInUse.clear();
        for (WmTezSession toDestroy : context.toDestroyNoRestart) {
            LOG.info("Closing {} without restart", (Object)toDestroy);
            this.workPool.submit(() -> {
                try {
                    WmEvent wmEvent = new WmEvent(WmEvent.EventType.DESTROY);
                    toDestroy.close(false);
                    wmEvent.endEvent(toDestroy);
                }
                catch (Exception ex) {
                    LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
                }
            });
        }
        context.toDestroyNoRestart.clear();
        for (Path path : context.pathsToDelete) {
            LOG.info("Deleting {}", (Object)path);
            this.workPool.submit(() -> {
                try {
                    path.getFileSystem((Configuration)this.conf).delete(path, true);
                }
                catch (Exception ex) {
                    LOG.error("Failed to delete an old path; ignoring " + ex.getMessage());
                }
            });
        }
        context.pathsToDelete.clear();
    }

    private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
        GetRequest req;
        RemoveSessionResult rr;
        HashSet<String> poolsToRedistribute = new HashSet<String>();
        for (SessionInitContext sessionInitContext : e.initResults) {
            this.handleInitResultOnMasterThread(sessionInitContext, syncWork, poolsToRedistribute);
        }
        e.initResults.clear();
        for (Map.Entry entry : e.killQueryResults.entrySet()) {
            WmTezSession killQuerySession = (WmTezSession)entry.getKey();
            boolean killResult = (Boolean)entry.getValue();
            LOG.debug("Processing KillQuery {} for {}", (Object)(killResult ? "success" : "failure"), (Object)killQuerySession);
            KillQueryContext killCtx = this.killQueryInProgress.get(killQuerySession);
            if (killCtx == null) {
                LOG.error("Internal error - cannot find the context for killing {}", (Object)killQuerySession);
                continue;
            }
            killCtx.handleKillQueryCallback(!killResult);
        }
        e.killQueryResults.clear();
        for (WmTezSession wmTezSession : e.toDestroy) {
            if (e.toReturn.remove(wmTezSession)) {
                LOG.warn("The session was both destroyed and returned by the user; destroying");
            }
            LOG.info("Destroying {}", (Object)wmTezSession);
            rr = this.handleReturnedInUseSessionOnMasterThread(e, wmTezSession, poolsToRedistribute, false);
            if (rr != RemoveSessionResult.OK && rr != RemoveSessionResult.NOT_FOUND) continue;
            syncWork.toRestartInUse.add(wmTezSession);
        }
        e.toDestroy.clear();
        block13: for (WmTezSession wmTezSession : e.toReturn) {
            LOG.info("Returning {}", (Object)wmTezSession);
            rr = this.handleReturnedInUseSessionOnMasterThread(e, wmTezSession, poolsToRedistribute, true);
            switch (rr) {
                case OK: {
                    WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
                    boolean wasReturned = this.tezAmPool.returnSessionAsync(wmTezSession);
                    if (!wasReturned) {
                        syncWork.toDestroyNoRestart.add(wmTezSession);
                        continue block13;
                    }
                    if (wmTezSession.getWmContext() != null && wmTezSession.getWmContext().isQueryCompleted()) {
                        wmTezSession.resolveReturnFuture();
                    }
                    wmEvent.endEvent(wmTezSession);
                    continue block13;
                }
                case NOT_FOUND: {
                    syncWork.toRestartInUse.add(wmTezSession);
                    continue block13;
                }
                case IGNORE: {
                    continue block13;
                }
            }
            throw new AssertionError((Object)("Unknown state " + rr));
        }
        e.toReturn.clear();
        for (Map.Entry entry : e.toReopen.entrySet()) {
            LOG.info("Reopening {}", entry.getKey());
            this.handeReopenRequestOnMasterThread(e, (WmTezSession)entry.getKey(), (SettableFuture)entry.getValue(), poolsToRedistribute, syncWork);
        }
        e.toReopen.clear();
        for (Map.Entry entry : e.updateErrors.entrySet()) {
            WmTezSession sessionWithUpdateError = (WmTezSession)entry.getKey();
            int failedEndpointVersion = (Integer)entry.getValue();
            LOG.info("Update failed for {}", (Object)sessionWithUpdateError);
            this.handleUpdateErrorOnMasterThread(sessionWithUpdateError, failedEndpointVersion, e.toReuse, syncWork, poolsToRedistribute);
        }
        e.updateErrors.clear();
        boolean hasRequeues = false;
        if (e.resourcePlanToApply != null || e.doClearResourcePlan) {
            LOG.info("Applying new resource plan");
            int n = e.getRequests.size();
            this.applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
            hasRequeues = n != e.getRequests.size();
        }
        e.resourcePlanToApply = null;
        e.doClearResourcePlan = false;
        HashMap<WmTezSession, WmEvent> hashMap = new HashMap<WmTezSession, WmEvent>();
        for (MoveSession moveSession : e.moveSessions) {
            this.handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, hashMap);
        }
        e.moveSessions.clear();
        while ((req = e.getRequests.pollFirst()) != null) {
            LOG.info("Processing a new get request from " + req.mappingInput);
            this.queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
        }
        e.toReuse.clear();
        Iterator<KillQueryContext> iter = this.killQueryInProgress.values().iterator();
        block18: while (iter.hasNext()) {
            KillQueryContext ctx = iter.next();
            KillQueryResult killQueryResult = ctx.process();
            switch (killQueryResult) {
                case IN_PROGRESS: {
                    continue block18;
                }
                case OK: {
                    iter.remove();
                    LOG.debug("Kill query succeeded; returning to the pool: {}", (Object)ctx.session);
                    ctx.killSessionFuture.set(true);
                    WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
                    if (!this.tezAmPool.returnSessionAsync(ctx.session)) {
                        syncWork.toDestroyNoRestart.add(ctx.session);
                        continue block18;
                    }
                    if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) {
                        ctx.session.resolveReturnFuture();
                    }
                    wmEvent.endEvent(ctx.session);
                    continue block18;
                }
                case RESTART_REQUIRED: {
                    iter.remove();
                    ctx.killSessionFuture.set(true);
                    LOG.debug("Kill query failed; restarting: {}", (Object)ctx.session);
                    syncWork.toRestartInUse.add(ctx.session);
                    continue block18;
                }
            }
            throw new AssertionError((Object)("Unknown state " + killQueryResult));
        }
        if (e.hasClusterStateChanged) {
            LOG.info("Processing a cluster state change");
            poolsToRedistribute.addAll(this.pools.keySet());
            e.hasClusterStateChanged = false;
        }
        for (String string : poolsToRedistribute) {
            if (LOG.isDebugEnabled()) {
                LOG.info("Processing changes for pool " + string + ": " + this.pools.get(string));
            }
            this.processPoolChangesOnMasterThread(string, hasRequeues, syncWork);
        }
        for (KillQueryContext killQueryContext : syncWork.toKillQuery.values()) {
            if (this.killQueryInProgress.put(killQueryContext.session, killQueryContext) == null) continue;
            LOG.error("One query killed several times - internal error {}", (Object)killQueryContext.session);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            ((WmEvent)entry.getValue()).endEvent((WmTezSession)entry.getKey());
        }
        if (e.dumpStateFuture != null) {
            ArrayList<String> result = new ArrayList<String>();
            result.add("RESOURCE PLAN " + this.rpName + "; default pool " + this.defaultPool);
            for (PoolState ps : this.pools.values()) {
                this.dumpPoolState(ps, result);
            }
            e.dumpStateFuture.set(result);
            e.dumpStateFuture = null;
        }
        if (e.testEvent != null) {
            e.testEvent.set(true);
            e.testEvent = null;
        }
        if (e.applyRpFuture != null) {
            e.applyRpFuture.set(true);
            e.applyRpFuture = null;
        }
    }

    private void dumpPoolState(PoolState ps, List<String> set) {
        StringBuilder sb = new StringBuilder();
        sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(", %% ").append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size()).append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ").append(ps.queue.size());
        set.add(sb.toString());
        sb.setLength(0);
        for (WmTezSession wmTezSession : ps.sessions) {
            double cf = wmTezSession.hasClusterFraction() ? wmTezSession.getClusterFraction() : 0.0;
            sb.append("RUNNING: ").append(cf).append(" (").append(wmTezSession.getAllocationState()).append(") => ").append(wmTezSession.getSessionId());
            set.add(sb.toString());
            sb.setLength(0);
        }
        for (SessionInitContext sessionInitContext : ps.initializingSessions) {
            sb.append("INITIALIZING: state ").append((Object)sessionInitContext.state);
            set.add(sb.toString());
            sb.setLength(0);
        }
        for (GetRequest getRequest : ps.queue) {
            sb.append("QUEUED: from ").append(getRequest.mappingInput);
            set.add(sb.toString());
            sb.setLength(0);
        }
    }

    private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse, Map<WmTezSession, WmEvent> recordMoveEvents) {
        String destPoolName = moveSession.destPool;
        LOG.info("Handling move session event: {}", (Object)moveSession);
        if (this.validMove(moveSession.srcSession, destPoolName)) {
            WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
            RemoveSessionResult rr = this.checkAndRemoveSessionFromItsPool(moveSession.srcSession, poolsToRedistribute, true, true);
            if (rr == RemoveSessionResult.OK) {
                if (this.capacityAvailable(destPoolName).booleanValue()) {
                    Boolean added = this.checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute);
                    if (added != null && added.booleanValue()) {
                        moveSession.future.set(true);
                        recordMoveEvents.put(moveSession.srcSession, moveEvent);
                        return;
                    }
                    LOG.error("Failed to move session: {}. Session is not added to destination.", (Object)moveSession);
                } else {
                    WmTezSession session = moveSession.srcSession;
                    KillQueryContext killQueryContext = new KillQueryContext(session, "Destination pool " + destPoolName + " is full. Killing query.");
                    this.resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
                }
            } else {
                LOG.error("Failed to move session: {}. Session is not removed from its pool.", (Object)moveSession);
            }
        } else {
            LOG.error("Validation failed for move session: {}. Invalid move or session/pool got removed.", (Object)moveSession);
        }
        moveSession.future.set(false);
    }

    private Boolean capacityAvailable(String destPoolName) {
        PoolState destPool = this.pools.get(destPoolName);
        return destPool.getTotalActiveSessions() < destPool.queryParallelism;
    }

    private boolean validMove(WmTezSession srcSession, String destPool) {
        return srcSession != null && destPool != null && !srcSession.isIrrelevantForWm() && srcSession.getPoolName() != null && this.pools.containsKey(srcSession.getPoolName()) && this.pools.containsKey(destPool) && !srcSession.getPoolName().equalsIgnoreCase(destPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleInitResultOnMasterThread(SessionInitContext sw, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
        WmTezSession session = null;
        sw.lock.lock();
        try {
            if (sw.state == SessionInitState.CANCELED) {
                return;
            }
            assert (sw.state == SessionInitState.DONE);
            session = sw.session;
            sw.session = null;
        }
        finally {
            sw.lock.unlock();
        }
        LOG.info("Processing " + (session == null ? "failed" : "successful") + " initialization result for pool " + sw.poolName);
        PoolState pool = this.pools.get(sw.poolName);
        if (pool == null || !pool.initializingSessions.remove(sw)) {
            LOG.error("Cannot remove initializing session from the pool " + sw.poolName + " - internal error");
        }
        poolsToRedistribute.add(sw.poolName);
        if (session != null) {
            if (pool != null) {
                pool.sessions.add(session);
            } else {
                LOG.error("Cannot add new session to the pool " + sw.poolName + " because it was removed unexpectedly - internal error " + session);
                syncWork.toRestartInUse.add(session);
            }
        }
    }

    private RemoveSessionResult handleReturnedInUseSessionOnMasterThread(EventState e, WmTezSession session, HashSet<String> poolsToRedistribute, boolean isReturn) {
        GetRequest reuseRequest;
        SettableFuture<WmTezSession> future;
        if (e.updateErrors.remove(session) != null) {
            LOG.info("Ignoring an update error for a session being destroyed or returned");
        }
        if ((future = e.toReopen.remove(session)) != null) {
            future.setException((Throwable)((Object)new AssertionError((Object)"Invalid reopen attempt")));
        }
        if ((reuseRequest = e.toReuse.remove(session)) != null) {
            reuseRequest.future.setException((Throwable)((Object)new AssertionError((Object)"Invalid reuse attempt")));
        }
        session.setQueryId(null);
        return this.checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn, true);
    }

    private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session, SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) throws Exception {
        GetRequest reuseRequest;
        if (e.updateErrors.remove(session) != null) {
            LOG.info("Ignoring an update error for a session being reopened");
        }
        if ((reuseRequest = e.toReuse.remove(session)) != null) {
            reuseRequest.future.setException((Throwable)((Object)new AssertionError((Object)"Invalid reuse attempt")));
        }
        String poolName = session.getPoolName();
        RemoveSessionResult rr = this.checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false, false);
        switch (rr) {
            case OK: {
                PoolState pool = this.pools.get(poolName);
                SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext(), session.extractHiveResources());
                pool.initializingSessions.add(sw);
                sw.start();
                syncWork.toRestartInUse.add(session);
                return;
            }
            case IGNORE: {
                future.setException(new RuntimeException("WM killed this session during reopen: " + session.getReasonForKill()));
                return;
            }
            case NOT_FOUND: {
                future.setException(new RuntimeException("Reopen failed due to an internal error"));
                syncWork.toRestartInUse.add(session);
                return;
            }
        }
        throw new AssertionError((Object)("Unknown state " + rr));
    }

    private void handleUpdateErrorOnMasterThread(WmTezSession session, int failedEndpointVersion, IdentityHashMap<WmTezSession, GetRequest> toReuse, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
        Ref<Integer> endpointVersion = new Ref<Integer>(-1);
        AmPluginNode.AmPluginInfo info = session.getAmPluginInfo(endpointVersion);
        if (info != null && (Integer)endpointVersion.value > failedEndpointVersion) {
            LOG.info("Ignoring an update error; endpoint information has been updated to {}", (Object)info);
            return;
        }
        GetRequest reuseRequest = toReuse.remove(session);
        if (reuseRequest != null) {
            reuseRequest.sessionToReuse = null;
        }
        RemoveSessionResult rr = this.checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, null, true);
        switch (rr) {
            case OK: 
            case NOT_FOUND: {
                session.setIsIrrelevantForWm("Failed to update resource allocation");
                syncWork.toRestartInUse.add(session);
                break;
            }
            case IGNORE: {
                return;
            }
            default: {
                throw new AssertionError((Object)("Unknown state " + rr));
            }
        }
    }

    private void applyNewResourcePlanOnMasterThread(EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
        int totalQueryParallelism = 0;
        WMFullResourcePlan plan = e.resourcePlanToApply;
        if (plan == null) {
            LOG.info("Disabling workload management because the resource plan has been removed");
            this.rpName = null;
            this.defaultPool = null;
            this.userPoolMapping = new UserPoolMapping(null, null);
        } else {
            this.rpName = plan.getPlan().getName();
            this.defaultPool = plan.getPlan().getDefaultPoolPath();
            this.userPoolMapping = new UserPoolMapping(plan.getMappings(), this.defaultPool);
        }
        Map<String, PoolState> oldPools = this.pools;
        this.pools = new HashMap<String, PoolState>();
        ArrayList poolsByLevel = new ArrayList();
        if (plan != null) {
            for (WMPool wMPool : plan.getPools()) {
                String fullName = wMPool.getPoolPath();
                int ix = StringUtils.countMatches(fullName, POOL_SEPARATOR_STR);
                while (poolsByLevel.size() <= ix) {
                    poolsByLevel.add(new LinkedList());
                }
                ((List)poolsByLevel.get(ix)).add(wMPool);
            }
        }
        for (int level = 0; level < poolsByLevel.size(); ++level) {
            List list = (List)poolsByLevel.get(level);
            for (WMPool pool : list) {
                PoolState state;
                String fullName = pool.getPoolPath();
                int qp = pool.getQueryParallelism();
                double fraction = pool.getAllocFraction();
                if (level > 0) {
                    String parentName = fullName.substring(0, fullName.lastIndexOf(46));
                    PoolState parent = this.pools.get(parentName);
                    fraction = parent.finalFraction * fraction;
                    parent.finalFractionRemaining -= fraction;
                }
                PoolState poolState = state = oldPools == null ? null : oldPools.remove(fullName);
                if (state == null) {
                    state = new PoolState(fullName, qp, fraction, pool.getSchedulingPolicy(), this.metricsSystem);
                } else {
                    state.update(qp, fraction, syncWork, e, pool.getSchedulingPolicy());
                    poolsToRedistribute.add(fullName);
                }
                state.setTriggers(new LinkedList<Trigger>());
                LOG.info("Adding Hive pool: " + state);
                this.pools.put(fullName, state);
                totalQueryParallelism += qp;
            }
        }
        for (PoolState poolState : this.pools.values()) {
            if (poolState.metrics == null) continue;
            poolState.metrics.setMaxExecutors(this.allocationManager.translateAllocationToCpus(poolState.finalFractionRemaining));
        }
        if (plan != null && plan.isSetTriggers() && plan.isSetPoolTriggers()) {
            HashMap triggers = new HashMap();
            for (WMTrigger trigger : plan.getTriggers()) {
                ExecutionTrigger execTrigger = ExecutionTrigger.fromWMTrigger(trigger);
                triggers.put(trigger.getTriggerName(), execTrigger);
            }
            for (WMPoolTrigger poolTrigger : plan.getPoolTriggers()) {
                PoolState pool = this.pools.get(poolTrigger.getPool());
                Trigger trigger = (Trigger)triggers.get(poolTrigger.getTrigger());
                pool.triggers.add(trigger);
                poolsToRedistribute.add(pool.fullName);
                LOG.info("Adding pool " + pool.fullName + " trigger " + trigger);
            }
        }
        if (oldPools != null && !oldPools.isEmpty()) {
            for (PoolState poolState : oldPools.values()) {
                poolState.destroy(syncWork, e.getRequests, e.toReuse);
            }
        }
        LOG.info("Updating with " + totalQueryParallelism + " total query parallelism");
        int deltaSessions = totalQueryParallelism - this.totalQueryParallelism;
        this.totalQueryParallelism = totalQueryParallelism;
        if (deltaSessions == 0) {
            return;
        }
        if (deltaSessions < 0) {
            deltaSessions = WorkloadManager.transferSessionsToDestroy(syncWork.toKillQuery.keySet(), syncWork.toDestroyNoRestart, deltaSessions);
            deltaSessions = WorkloadManager.transferSessionsToDestroy(syncWork.toRestartInUse, syncWork.toDestroyNoRestart, deltaSessions);
        }
        if (deltaSessions != 0) {
            this.failOnFutureFailure(this.tezAmPool.resizeAsync(deltaSessions, syncWork.toDestroyNoRestart));
        }
    }

    private static int transferSessionsToDestroy(Collection<WmTezSession> source, List<WmTezSession> toDestroy, int deltaSessions) {
        if (deltaSessions >= 0) {
            return deltaSessions;
        }
        int toTransfer = Math.min(-deltaSessions, source.size());
        Iterator<WmTezSession> iter = source.iterator();
        for (int i = 0; i < toTransfer; ++i) {
            WmTezSession session = iter.next();
            LOG.debug("Will destroy {} instead of restarting", (Object)session);
            if (!session.isIrrelevantForWm()) {
                session.setIsIrrelevantForWm("Killed due to workload management plan change");
            }
            toDestroy.add(session);
            iter.remove();
        }
        return deltaSessions + toTransfer;
    }

    private void failOnFutureFailure(ListenableFuture<?> future) {
        Futures.addCallback(future, FATAL_ERROR_CALLBACK, MoreExecutors.directExecutor());
    }

    private void queueGetRequestOnMasterThread(GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
        String poolName = this.userPoolMapping.mapSessionToPoolName(req.mappingInput, this.allowAnyPool, this.allowAnyPool ? this.pools.keySet() : null);
        if (poolName == null) {
            req.future.setException(new NoPoolMappingException("Cannot find any pool mapping for " + req.mappingInput));
            this.returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
            return;
        }
        PoolState pool = this.pools.get(poolName);
        if (pool == null) {
            req.future.setException((Throwable)((Object)new AssertionError((Object)(poolName + " not found (internal error)."))));
            this.returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
            return;
        }
        PoolState oldPool = null;
        if (req.sessionToReuse != null) {
            String oldPoolName = req.sessionToReuse.getPoolName();
            oldPool = this.pools.get(oldPoolName);
            RemoveSessionResult rr = this.checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute, true, false);
            if (rr != RemoveSessionResult.OK) {
                if (oldPool.metrics != null) {
                    oldPool.metrics.removeRunningQueries(1);
                }
                this.returnSessionOnFailedReuse(req, syncWork, null);
                req.sessionToReuse = null;
            } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
                if (oldPool.metrics != null) {
                    oldPool.metrics.removeRunningQueries(1);
                }
                this.returnSessionOnFailedReuse(req, syncWork, null);
                req.sessionToReuse = null;
            }
        }
        if (req.sessionToReuse != null) {
            req.sessionToReuse.setPoolName(poolName);
            req.sessionToReuse.setQueueName(this.yarnQueue);
            req.sessionToReuse.setQueryId(req.queryId);
            pool.sessions.add(req.sessionToReuse);
            if (pool != oldPool) {
                poolsToRedistribute.add(poolName);
            }
            req.future.set(req.sessionToReuse);
            return;
        }
        pool.queue.addLast(req);
        if (pool.metrics != null) {
            pool.metrics.addQueuedQuery();
        }
        poolsToRedistribute.add(poolName);
    }

    private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues, WmThreadSyncWork syncWork) throws Exception {
        PoolState pool = this.pools.get(poolName);
        if (pool == null) {
            return;
        }
        int queriesToStart = Math.min(pool.queue.size(), pool.queryParallelism - pool.getTotalActiveSessions());
        if (queriesToStart > 0) {
            LOG.info("Starting {} queries in pool {}", (Object)queriesToStart, (Object)pool);
        }
        if (hasRequeues) {
            Collections.sort(pool.queue, GetRequest.ORDER_COMPARATOR);
        }
        for (int i = 0; i < queriesToStart; ++i) {
            GetRequest queueReq = pool.queue.pollFirst();
            if (pool.metrics != null) {
                pool.metrics.moveQueuedToRunning();
            }
            assert (queueReq.sessionToReuse == null);
            SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null);
            sw.start();
            boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions, syncWork.pathsToDelete);
            if (isDone) continue;
            pool.initializingSessions.add(sw);
        }
        double totalAlloc = pool.updateAllocationPercentages();
        int cpusAllocated = this.allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
        if (pool.metrics != null) {
            pool.metrics.setExecutors(cpusAllocated);
            if (cpusAllocated > 0) {
                pool.metrics.setMaxExecutors(this.allocationManager.translateAllocationToCpus(totalAlloc));
            }
        }
    }

    private void returnSessionOnFailedReuse(GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
        WmTezSession session = req.sessionToReuse;
        if (session == null) {
            return;
        }
        req.sessionToReuse = null;
        session.setQueryId(null);
        if (poolsToRedistribute != null) {
            boolean isOk;
            RemoveSessionResult rr = this.checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, true, true);
            boolean bl = isOk = rr == RemoveSessionResult.OK;
            assert (isOk || rr == RemoveSessionResult.IGNORE);
            if (!isOk) {
                return;
            }
        }
        WmEvent wmEvent = new WmEvent(WmEvent.EventType.RETURN);
        if (!this.tezAmPool.returnSessionAsync(session)) {
            syncWork.toDestroyNoRestart.add(session);
        } else {
            if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) {
                session.resolveReturnFuture();
            }
            wmEvent.endEvent(session);
        }
    }

    private RemoveSessionResult checkAndRemoveSessionFromItsPool(WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk, boolean updateMetrics) {
        if (session.isIrrelevantForWm()) {
            return RemoveSessionResult.IGNORE;
        }
        if (this.killQueryInProgress.containsKey(session)) {
            if (isSessionOk != null) {
                this.killQueryInProgress.get(session).handleUserCallback(isSessionOk == false);
            }
            return RemoveSessionResult.IGNORE;
        }
        String poolName = session.getPoolName();
        if (poolName != null) {
            poolsToRedistribute.add(poolName);
            PoolState pool = this.pools.get(poolName);
            session.clearWm();
            if (pool != null && pool.sessions.remove(session)) {
                if (updateMetrics && pool.metrics != null) {
                    pool.metrics.removeRunningQueries(1);
                }
                return RemoveSessionResult.OK;
            }
        }
        LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
        return RemoveSessionResult.NOT_FOUND;
    }

    private Boolean checkAndAddSessionToAnotherPool(WmTezSession session, String destPoolName, Set<String> poolsToRedistribute) {
        if (session.isIrrelevantForWm()) {
            LOG.error("Unexpected during add session to another pool. If remove failed this should not have been called.");
            return false;
        }
        PoolState destPool = this.pools.get(destPoolName);
        if (destPool != null && destPool.sessions.add(session)) {
            if (destPool.metrics != null) {
                destPool.metrics.addRunningQuery();
            }
            session.setPoolName(destPoolName);
            this.updateTriggers(session);
            poolsToRedistribute.add(destPoolName);
            return true;
        }
        LOG.error("Session {} was not added to pool {}", (Object)session, (Object)destPoolName);
        return null;
    }

    public ListenableFuture<Boolean> updateResourcePlanAsync(WMFullResourcePlan plan) {
        SettableFuture<Boolean> applyRpFuture = SettableFuture.create();
        this.currentLock.lock();
        try {
            if (this.current.resourcePlanToApply != null) {
                LOG.warn("Several resource plans are being applied at the same time; using the latest");
                this.current.applyRpFuture.setException(new HiveException("Another plan was applied in parallel"));
            }
            this.current.applyRpFuture = applyRpFuture;
            if (plan == null) {
                this.current.resourcePlanToApply = null;
                this.current.doClearResourcePlan = true;
            } else {
                this.current.resourcePlanToApply = plan;
                this.current.doClearResourcePlan = false;
            }
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        return applyRpFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Future<Boolean> applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) {
        MoveSession moveSession;
        this.currentLock.lock();
        try {
            moveSession = new MoveSession(srcSession, destPoolName);
            this.current.moveSessions.add(moveSession);
            LOG.info("Queued move session: {}", (Object)moveSession);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        return moveSession.future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Future<Boolean> applyKillSessionAsync(WmTezSession wmTezSession, String killReason) {
        KillQueryContext killQueryContext;
        this.currentLock.lock();
        try {
            killQueryContext = new KillQueryContext(wmTezSession, killReason);
            this.resetAndQueueKill(this.syncWork.toKillQuery, killQueryContext, this.current.toReuse);
            LOG.info("Queued session for kill: {}", (Object)killQueryContext.session);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        return killQueryContext.killSessionFuture;
    }

    @VisibleForTesting
    public WmTezSession getSession(TezSessionState session, UserPoolMapping.MappingInput input, HiveConf conf) throws Exception {
        return this.getSession(session, input, conf, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public WmTezSession getSession(TezSessionState session, UserPoolMapping.MappingInput input, HiveConf conf, WmContext wmContext) throws Exception {
        WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
        this.validateConfig(conf);
        String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
        SettableFuture<WmTezSession> future = SettableFuture.create();
        WmTezSession wmSession = this.checkSessionForReuse(session);
        GetRequest req = new GetRequest(input, queryId, future, wmSession, this.getRequestVersion.incrementAndGet(), wmContext);
        this.currentLock.lock();
        try {
            this.current.getRequests.add(req);
            if (req.sessionToReuse != null) {
                this.current.toReuse.put(wmSession, req);
            }
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        try {
            WmTezSession sessionState = (WmTezSession)future.get();
            wmEvent.endEvent(sessionState);
            return sessionState;
        }
        catch (ExecutionException ex) {
            Throwable realEx = ex.getCause();
            throw realEx instanceof Exception ? (Exception)realEx : ex;
        }
    }

    @Override
    public void destroy(TezSessionState session) throws Exception {
        WmTezSession wmTezSession = this.ensureOwnedSession(session);
        this.resetGlobalTezSession(wmTezSession);
        this.currentLock.lock();
        try {
            this.current.toDestroy.add(wmTezSession);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    private void resetGlobalTezSession(WmTezSession wmTezSession) {
        SessionState sessionState = SessionState.get();
        if (sessionState != null && sessionState.getTezSession() == wmTezSession) {
            sessionState.setTezSession(null);
        }
    }

    @Override
    public void returnAfterUse(TezSessionPoolSession session) throws Exception {
        WmTezSession wmTezSession = this.ensureOwnedSession(session);
        this.resetGlobalTezSession(wmTezSession);
        this.currentLock.lock();
        try {
            wmTezSession.createAndSetReturnFuture();
            this.current.toReturn.add(wmTezSession);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    public void notifyOfInconsistentAllocation(WmTezSession session) {
        this.allocationManager.updateSessionAsync(session);
    }

    public void notifyOfClusterStateChange() {
        this.currentLock.lock();
        try {
            this.current.hasClusterStateChanged = true;
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addUpdateError(WmTezSession wmTezSession, int endpointVersion) {
        this.currentLock.lock();
        try {
            Integer existing = this.current.updateErrors.get(wmTezSession);
            if (existing != null && existing >= endpointVersion) {
                return;
            }
            this.current.updateErrors.put(wmTezSession, endpointVersion);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    @Override
    public List<String> getWmStateDescription() {
        SettableFuture<List<String>> future = null;
        this.currentLock.lock();
        try {
            if (this.current.dumpStateFuture != null) {
                future = this.current.dumpStateFuture;
            } else {
                this.current.dumpStateFuture = SettableFuture.create();
                future = this.current.dumpStateFuture;
                this.notifyWmThreadUnderLock();
            }
        }
        finally {
            this.currentLock.unlock();
        }
        try {
            return (List)future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Error getting description", (Throwable)e);
            return Lists.newArrayList("Error: " + e.toString());
        }
    }

    private void addKillQueryResult(WmTezSession toKill, boolean success) {
        this.currentLock.lock();
        try {
            this.current.killQueryResults.put(toKill, success);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    @VisibleForTesting
    Future<Boolean> addTestEvent() {
        SettableFuture<Boolean> testEvent = SettableFuture.create();
        this.currentLock.lock();
        try {
            this.current.testEvent = testEvent;
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        return testEvent;
    }

    public void notifyInitializationCompleted(SessionInitContext initCtx) {
        this.currentLock.lock();
        try {
            this.current.initResults.add(initCtx);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TezSessionState reopen(TezSessionState session) throws Exception {
        WmTezSession wmTezSession = this.ensureOwnedSession(session);
        HiveConf sessionConf = wmTezSession.getConf();
        if (sessionConf == null) {
            LOG.warn("Session configuration is null for " + wmTezSession);
            sessionConf = new HiveConf(this.conf, WorkloadManager.class);
        }
        SettableFuture future = SettableFuture.create();
        this.currentLock.lock();
        try {
            if (this.current.toReopen.containsKey(wmTezSession)) {
                throw new AssertionError((Object)("The session is being reopened more than once " + session));
            }
            this.current.toReopen.put(wmTezSession, future);
            this.notifyWmThreadUnderLock();
        }
        finally {
            this.currentLock.unlock();
        }
        return (TezSessionState)future.get();
    }

    @Override
    public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
        this.tezAmPool.replaceSession(this.ensureOwnedSession(session));
    }

    private void notifyWmThreadUnderLock() {
        if (this.hasChanges) {
            return;
        }
        this.hasChanges = true;
        this.hasChangesCondition.signalAll();
    }

    private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception {
        if (session == null) {
            return null;
        }
        WmTezSession result = null;
        if (session instanceof WmTezSession) {
            result = (WmTezSession)session;
            if (result.isOwnedBy(this)) {
                return result;
            }
            LOG.warn("Attempting to reuse a session not belonging to us: " + result);
            result.returnToSessionManager();
            return null;
        }
        LOG.warn("Attempting to reuse a non-WM session for workload management:" + session);
        if (session instanceof TezSessionPoolSession) {
            session.returnToSessionManager();
        } else {
            session.close(false);
        }
        return null;
    }

    private void validateConfig(HiveConf conf) throws HiveException {
        String queueName = conf.get("tez.queue.name");
        if (queueName != null && !queueName.isEmpty()) {
            LOG.warn("Ignoring tez.queue.name=" + queueName);
            conf.set("tez.queue.name", this.yarnQueue);
        }
        if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
            throw new HiveException(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported");
        }
        if (this.restrictedConfig != null) {
            this.restrictedConfig.validate(conf);
        }
    }

    private WmTezSession createSession(HiveConf conf) {
        WmTezSession session = this.createSessionObject(TezSessionState.makeSessionId(), conf);
        session.setQueueName(this.yarnQueue);
        session.setDefault();
        LOG.info("Created new interactive session object " + session.getSessionId());
        return session;
    }

    @VisibleForTesting
    protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
        conf = conf == null ? new HiveConf(this.conf) : conf;
        conf.set("llap.plugin.endpoint.enabled", "true");
        return new WmTezSession(sessionId, this, this.expirationTracker, conf);
    }

    private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
        if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this)) {
            throw new AssertionError((Object)("Not a WM session " + oldSession));
        }
        WmTezSession session = (WmTezSession)oldSession;
        return session;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerOpenSession(TezSessionPoolSession session) {
        IdentityHashMap<TezSessionPoolSession, Boolean> identityHashMap = this.openSessions;
        synchronized (identityHashMap) {
            this.openSessions.put(session, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterOpenSession(TezSessionPoolSession session) {
        IdentityHashMap<TezSessionPoolSession, Boolean> identityHashMap = this.openSessions;
        synchronized (identityHashMap) {
            this.openSessions.remove(session);
        }
        this.tezAmPool.notifyClosed(session);
    }

    @VisibleForTesting
    public SessionExpirationTracker getExpirationTracker() {
        return this.expirationTracker;
    }

    @VisibleForTesting
    int getNumSessions() {
        return this.tezAmPool.getInitialSize();
    }

    protected final HiveConf getConf() {
        return this.conf;
    }

    void updateTriggers(WmTezSession session) {
        WmContext wmContext = session.getWmContext();
        String poolName = session.getPoolName();
        PoolState poolState = this.pools.get(poolName);
        if (wmContext != null && poolState != null) {
            wmContext.addTriggers(poolState.getTriggers());
            LOG.info("Subscribed to counters: {}", wmContext.getSubscribedCounters());
        }
    }

    @Override
    Runnable getTriggerValidatorRunnable() {
        return this.triggerValidatorRunnable;
    }

    boolean isManaged(UserPoolMapping.MappingInput input) {
        UserPoolMapping mapping = this.userPoolMapping;
        if (mapping != null) {
            String mappedPool = mapping.mapSessionToPoolName(input, this.allowAnyPool, null);
            LOG.info("Mapping input: {} mapped to pool: {}", (Object)input, (Object)mappedPool);
            return true;
        }
        return false;
    }

    private static void resetRemovedSessionToKill(Map<WmTezSession, KillQueryContext> toKillQuery, KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
        toKillQuery.put(killQueryContext.session, killQueryContext);
        killQueryContext.session.clearWm();
        GetRequest req = toReuse.remove(killQueryContext.session);
        if (req != null) {
            req.sessionToReuse = null;
        }
    }

    private void resetAndQueueKill(Map<WmTezSession, KillQueryContext> toKillQuery, KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
        PoolState poolState;
        WmTezSession toKill = killQueryContext.session;
        toKillQuery.put(toKill, killQueryContext);
        String poolName = toKill.getPoolName();
        if (poolName != null && (poolState = this.pools.get(poolName)) != null) {
            poolState.getSessions().remove(toKill);
            Iterator iter = poolState.getInitializingSessions().iterator();
            while (iter.hasNext()) {
                if (((SessionInitContext)iter.next()).session != toKill) continue;
                iter.remove();
                break;
            }
        }
        toKill.clearWm();
        GetRequest req = toReuse.remove(toKill);
        if (req != null) {
            req.sessionToReuse = null;
        }
    }

    @VisibleForTesting
    TezSessionPool<WmTezSession> getTezAmPool() {
        return this.tezAmPool;
    }

    public static final class NoPoolMappingException
    extends Exception {
        private static final long serialVersionUID = 346375346724L;

        public NoPoolMappingException(String message) {
            super(message);
        }
    }

    static final class KillQueryContext {
        private SettableFuture<Boolean> killSessionFuture;
        private final String reason;
        private final WmTezSession session;
        private boolean isUserDone = false;
        private boolean isKillDone = false;
        private boolean hasKillFailed = false;
        private boolean hasUserFailed = false;

        KillQueryContext(WmTezSession session, String reason) {
            this.session = session;
            this.reason = reason;
            this.killSessionFuture = SettableFuture.create();
        }

        private void handleKillQueryCallback(boolean hasFailed) {
            this.isKillDone = true;
            this.hasKillFailed = hasFailed;
        }

        private void handleUserCallback(boolean hasFailed) {
            if (this.isUserDone) {
                LOG.warn("Duplicate user call for a session being killed; ignoring");
                return;
            }
            this.isUserDone = true;
            this.hasUserFailed = hasFailed;
        }

        private KillQueryResult process() {
            if (!this.isUserDone && this.hasKillFailed) {
                this.session.setIsIrrelevantForWm(this.reason);
                return KillQueryResult.RESTART_REQUIRED;
            }
            if (!this.isUserDone || !this.isKillDone) {
                return KillQueryResult.IN_PROGRESS;
            }
            if (this.hasUserFailed && this.hasKillFailed) {
                this.session.setIsIrrelevantForWm(this.reason);
                return KillQueryResult.RESTART_REQUIRED;
            }
            return KillQueryResult.OK;
        }

        public String toString() {
            return "KillQueryContext [isUserDone=" + this.isUserDone + ", isKillDone=" + this.isKillDone + ", hasKillFailed=" + this.hasKillFailed + ", hasUserFailed=" + this.hasUserFailed + ", session=" + this.session + ", reason=" + this.reason + "]";
        }
    }

    private static enum KillQueryResult {
        OK,
        RESTART_REQUIRED,
        IN_PROGRESS;

    }

    private final class SessionInitContext
    implements FutureCallback<WmTezSession> {
        private static final int MAX_ATTEMPT_NUMBER = 1;
        private final String poolName;
        private final String queryId;
        private final ReentrantLock lock = new ReentrantLock();
        private WmTezSession session;
        private SettableFuture<WmTezSession> future;
        private SessionInitState state = SessionInitState.GETTING;
        private String cancelReason;
        private TezSessionState.HiveResources prelocalizedResources;
        private Path pathToDelete;
        private WmContext wmContext;
        private int attemptNumber = 0;

        public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId, WmContext wmContext, TezSessionState.HiveResources prelocalizedResources) {
            this.future = future;
            this.poolName = poolName;
            this.queryId = queryId;
            this.prelocalizedResources = prelocalizedResources;
            this.wmContext = wmContext;
        }

        public void start() throws Exception {
            ListenableFuture<WmTezSession> getFuture = WorkloadManager.this.tezAmPool.getSessionAsync();
            Futures.addCallback(getFuture, this, MoreExecutors.directExecutor());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        @Override
        public void onSuccess(WmTezSession session) {
            future = null;
            this.lock.lock();
            try {
                oldState = this.state;
                switch (2.$SwitchMap$org$apache$hadoop$hive$ql$exec$tez$WorkloadManager$SessionInitState[oldState.ordinal()]) {
                    case 1: {
                        WorkloadManager.LOG.info("Received a session from AM pool {}", (Object)session);
                        if (!SessionInitContext.$assertionsDisabled && this.state != SessionInitState.GETTING) {
                            throw new AssertionError();
                        }
                        session.setPoolName(this.poolName);
                        session.setQueueName(WorkloadManager.this.yarnQueue);
                        session.setQueryId(this.queryId);
                        if (this.prelocalizedResources != null) {
                            this.pathToDelete = session.replaceHiveResources(this.prelocalizedResources, true);
                        }
                        if (this.wmContext != null) {
                            session.setWmContext(this.wmContext);
                        }
                        this.session = session;
                        this.state = SessionInitState.WAITING_FOR_REGISTRY;
                        ** break;
lbl20:
                        // 1 sources

                        break;
                    }
                    case 2: {
                        if (!SessionInitContext.$assertionsDisabled && this.session == null) {
                            throw new AssertionError();
                        }
                        this.state = SessionInitState.DONE;
                        if (!SessionInitContext.$assertionsDisabled && session != this.session) {
                            throw new AssertionError();
                        }
                        future = this.future;
                        this.future = null;
                        ** break;
lbl30:
                        // 1 sources

                        break;
                    }
                    case 3: {
                        future = this.future;
                        this.session = null;
                        this.future = null;
                        ** break;
lbl36:
                        // 1 sources

                        break;
                    }
                    default: {
                        future = this.future;
                        this.future = null;
                        break;
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
            switch (2.$SwitchMap$org$apache$hadoop$hive$ql$exec$tez$WorkloadManager$SessionInitState[oldState.ordinal()]) {
                case 1: {
                    waitFuture = session.waitForAmRegistryAsync(WorkloadManager.this.amRegistryTimeoutMs, WorkloadManager.this.timeoutPool);
                    Futures.addCallback(waitFuture, this, MoreExecutors.directExecutor());
                    break;
                }
                case 2: {
                    WorkloadManager.this.notifyInitializationCompleted(this);
                    future.set(session);
                    break;
                }
                case 3: {
                    future.setException(new HiveException("The query was killed by workload management: " + this.cancelReason));
                    session.clearWm();
                    session.setQueryId(null);
                    session.setWmContext(null);
                    WorkloadManager.this.tezAmPool.returnSession(session);
                    break;
                }
                default: {
                    error = new AssertionError((Object)("Unexpected state " + this.state));
                    future.setException((Throwable)error);
                    throw error;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onFailure(Throwable t) {
            SettableFuture<WmTezSession> future;
            WmTezSession session;
            boolean wasCanceled = false;
            boolean doRetry = false;
            this.lock.lock();
            try {
                wasCanceled = this.state == SessionInitState.CANCELED;
                session = this.session;
                this.session = null;
                boolean bl = doRetry = !wasCanceled && this.attemptNumber < 1;
                if (doRetry) {
                    ++this.attemptNumber;
                    this.state = SessionInitState.GETTING;
                    future = null;
                } else {
                    future = this.future;
                    this.future = null;
                    if (!wasCanceled) {
                        this.state = SessionInitState.DONE;
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
            if (doRetry) {
                try {
                    this.start();
                    return;
                }
                catch (Exception e) {
                    LOG.error("Failed to retry; propagating original error. The new error is ", (Throwable)e);
                }
                finally {
                    this.discardSessionOnFailure(session);
                }
            }
            if (!wasCanceled) {
                if (LOG.isDebugEnabled()) {
                    LOG.info("Queueing the initialization failure with " + session);
                }
                WorkloadManager.this.notifyInitializationCompleted(this);
            }
            future.setException(t);
            this.discardSessionOnFailure(session);
        }

        public void discardSessionOnFailure(WmTezSession session) {
            if (session == null) {
                return;
            }
            session.clearWm();
            session.setQueryId(null);
            try {
                WorkloadManager.this.tezAmPool.replaceSession(session);
            }
            catch (Exception e) {
                LOG.error("Failed to restart a failed session", (Throwable)e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public WmTezSession cancelAndExtractSessionIfDone(String cancelReason, List<Path> toDelete) {
            this.lock.lock();
            try {
                SessionInitState state = this.state;
                this.state = SessionInitState.CANCELED;
                this.cancelReason = cancelReason;
                if (state == SessionInitState.DONE) {
                    WmTezSession result = this.session;
                    this.session = null;
                    if (this.pathToDelete != null) {
                        toDelete.add(this.pathToDelete);
                    }
                    WmTezSession wmTezSession = result;
                    return wmTezSession;
                }
                if (state == SessionInitState.CANCELED) {
                    LOG.warn("Duplicate call to extract " + this.session);
                }
                WmTezSession wmTezSession = null;
                return wmTezSession;
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean extractSessionAndCancelIfDone(List<WmTezSession> results, List<Path> toDelete) {
            this.lock.lock();
            try {
                if (this.state != SessionInitState.DONE) {
                    boolean bl = false;
                    return bl;
                }
                this.state = SessionInitState.CANCELED;
                if (this.pathToDelete != null) {
                    toDelete.add(this.pathToDelete);
                }
                if (this.session != null) {
                    results.add(this.session);
                }
                this.session = null;
                boolean bl = true;
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        public String toString() {
            return "[state=" + this.state + ", session=" + this.session + "]";
        }
    }

    private static enum SessionInitState {
        GETTING,
        WAITING_FOR_REGISTRY,
        DONE,
        CANCELED;

    }

    private static class PoolState {
        private final LinkedList<SessionInitContext> initializingSessions = new LinkedList();
        private final LinkedList<WmTezSession> sessions = new LinkedList();
        private final LinkedList<GetRequest> queue = new LinkedList();
        private final WmPoolMetrics metrics;
        private final String fullName;
        private double finalFraction;
        private double finalFractionRemaining;
        private int queryParallelism = -1;
        private List<Trigger> triggers = new ArrayList<Trigger>();
        private WMPoolSchedulingPolicy schedulingPolicy;

        public PoolState(String fullName, int queryParallelism, double fraction, String schedulingPolicy, MetricsSystem ms) {
            this.fullName = fullName;
            this.metrics = ms == null ? null : WmPoolMetrics.create(fullName, ms);
            this.update(queryParallelism, fraction, null, null, schedulingPolicy);
        }

        public int getTotalActiveSessions() {
            return this.sessions.size() + this.initializingSessions.size();
        }

        public void update(int queryParallelism, double fraction, WmThreadSyncWork syncWork, EventState e, String schedulingPolicy) {
            GetRequest req;
            this.finalFraction = this.finalFractionRemaining = fraction;
            this.queryParallelism = queryParallelism;
            if (this.metrics != null) {
                this.metrics.setParallelQueries(queryParallelism);
            }
            try {
                this.schedulingPolicy = MetaStoreUtils.parseSchedulingPolicy(schedulingPolicy);
            }
            catch (IllegalArgumentException ex) {
                LOG.error("Unknown scheduling policy " + schedulingPolicy + "; using FAIR");
                this.schedulingPolicy = WMPoolSchedulingPolicy.FAIR;
            }
            if (queryParallelism < this.getTotalActiveSessions()) {
                this.extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, syncWork);
            }
            if (this.metrics != null) {
                this.metrics.removeQueuedQueries(this.queue.size());
            }
            while ((req = this.queue.pollLast()) != null) {
                e.getRequests.addFirst(req);
            }
        }

        public void destroy(WmThreadSyncWork syncWork, LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
            this.extractAllSessionsToKill("The query pool was removed by administrator", toReuse, syncWork);
            globalQueue.addAll(0, this.queue);
            if (this.metrics != null) {
                this.metrics.removeQueuedQueries(this.queue.size());
                this.metrics.destroy();
            }
            this.queue.clear();
        }

        public double updateAllocationPercentages() {
            switch (this.schedulingPolicy) {
                case FAIR: {
                    int totalSessions = this.sessions.size() + this.initializingSessions.size();
                    if (totalSessions == 0) {
                        return 0.0;
                    }
                    double allocation = this.finalFractionRemaining / (double)totalSessions;
                    for (WmTezSession session : this.sessions) {
                        this.updateSessionAllocationWithEvent(session, allocation);
                    }
                    return this.finalFractionRemaining - allocation * (double)this.initializingSessions.size();
                }
                case FIFO: {
                    if (this.sessions.isEmpty()) {
                        return 0.0;
                    }
                    boolean isFirst = true;
                    for (WmTezSession session : this.sessions) {
                        this.updateSessionAllocationWithEvent(session, isFirst ? this.finalFractionRemaining : 0.0);
                        isFirst = false;
                    }
                    return this.finalFractionRemaining;
                }
            }
            throw new AssertionError((Object)("Unexpected enum value " + this.schedulingPolicy));
        }

        private void updateSessionAllocationWithEvent(WmTezSession session, double allocation) {
            WmEvent event = null;
            WmContext ctx = session.getWmContext();
            if (ctx != null && session.hasClusterFraction() && !DoubleMath.fuzzyEquals(session.getClusterFraction(), allocation, 1.0E-4f)) {
                event = new WmEvent(WmEvent.EventType.UPDATE);
            }
            session.setClusterFraction(allocation);
            if (event != null) {
                event.endEvent(session);
            }
        }

        public LinkedList<WmTezSession> getSessions() {
            return this.sessions;
        }

        public LinkedList<SessionInitContext> getInitializingSessions() {
            return this.initializingSessions;
        }

        public String toString() {
            return "[" + this.fullName + ", query parallelism " + this.queryParallelism + ", fraction of the cluster " + this.finalFraction + ", fraction used by child pools " + (this.finalFraction - this.finalFractionRemaining) + ", active sessions " + this.sessions.size() + ", initializing sessions " + this.initializingSessions.size() + "]";
        }

        private void extractAllSessionsToKill(String killReason, IdentityHashMap<WmTezSession, GetRequest> toReuse, WmThreadSyncWork syncWork) {
            int totalCount = this.sessions.size() + this.initializingSessions.size();
            for (WmTezSession sessionToKill : this.sessions) {
                WorkloadManager.resetRemovedSessionToKill(syncWork.toKillQuery, new KillQueryContext(sessionToKill, killReason), toReuse);
            }
            this.sessions.clear();
            for (SessionInitContext initCtx : this.initializingSessions) {
                WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason, syncWork.pathsToDelete);
                if (sessionToKill == null) continue;
                WorkloadManager.resetRemovedSessionToKill(syncWork.toKillQuery, new KillQueryContext(sessionToKill, killReason), toReuse);
            }
            this.initializingSessions.clear();
            if (this.metrics != null) {
                this.metrics.removeRunningQueries(totalCount);
            }
        }

        public void setTriggers(LinkedList<Trigger> triggers) {
            this.triggers = triggers;
        }

        public List<Trigger> getTriggers() {
            return this.triggers;
        }
    }

    private static final class GetRequest {
        public static final Comparator<GetRequest> ORDER_COMPARATOR = (o1, o2) -> {
            if (o1.order == o2.order) {
                return 0;
            }
            return o1.order < o2.order ? -1 : 1;
        };
        private final long order;
        private final UserPoolMapping.MappingInput mappingInput;
        private final SettableFuture<WmTezSession> future;
        private WmTezSession sessionToReuse;
        private final String queryId;
        private final WmContext wmContext;

        private GetRequest(UserPoolMapping.MappingInput mappingInput, String queryId, SettableFuture<WmTezSession> future, WmTezSession sessionToReuse, long order, WmContext wmContext) {
            assert (mappingInput != null);
            this.mappingInput = mappingInput;
            this.queryId = queryId;
            this.future = future;
            this.sessionToReuse = sessionToReuse;
            this.order = order;
            this.wmContext = wmContext;
        }

        public String toString() {
            return "[#" + this.order + ", " + this.mappingInput + ", reuse " + this.sessionToReuse + "]";
        }
    }

    private static enum RemoveSessionResult {
        OK,
        IGNORE,
        NOT_FOUND;

    }

    private static final class WmThreadSyncWork {
        private List<WmTezSession> toRestartInUse = new LinkedList<WmTezSession>();
        private List<WmTezSession> toDestroyNoRestart = new LinkedList<WmTezSession>();
        private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<WmTezSession, KillQueryContext>();
        private List<Path> pathsToDelete = Lists.newArrayList();

        private WmThreadSyncWork() {
        }
    }

    private static final class MoveSession {
        private final WmTezSession srcSession;
        private final String destPool;
        private final SettableFuture<Boolean> future;

        public MoveSession(WmTezSession srcSession, String destPool) {
            this.srcSession = srcSession;
            this.destPool = destPool;
            this.future = SettableFuture.create();
        }

        public String toString() {
            return this.srcSession.getSessionId() + " moving from " + this.srcSession.getPoolName() + " to " + this.destPool;
        }
    }

    private static final class EventState {
        private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet();
        private final Set<WmTezSession> toDestroy = Sets.newIdentityHashSet();
        private final Map<WmTezSession, Boolean> killQueryResults = new IdentityHashMap<WmTezSession, Boolean>();
        private final LinkedList<SessionInitContext> initResults = new LinkedList();
        private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen = new IdentityHashMap();
        private final IdentityHashMap<WmTezSession, Integer> updateErrors = new IdentityHashMap();
        private final LinkedList<GetRequest> getRequests = new LinkedList();
        private final IdentityHashMap<WmTezSession, GetRequest> toReuse = new IdentityHashMap();
        private WMFullResourcePlan resourcePlanToApply = null;
        private boolean doClearResourcePlan = false;
        private boolean hasClusterStateChanged = false;
        private SettableFuture<Boolean> testEvent;
        private SettableFuture<Boolean> applyRpFuture;
        private SettableFuture<List<String>> dumpStateFuture;
        private final List<MoveSession> moveSessions = new LinkedList<MoveSession>();

        private EventState() {
        }
    }
}

