/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service.session;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionManagerImpl
implements SessionManager {
    private static final Logger LOG = LoggerFactory.getLogger(SessionManagerImpl.class);
    private static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool";
    private final DefaultContext defaultContext;
    private final long idleTimeout;
    private final long checkInterval;
    private final int maxSessionCount;
    private final Map<SessionHandle, Session> sessions;
    private ExecutorService operationExecutorService;
    @Nullable
    private ScheduledExecutorService cleanupService;
    @Nullable
    private ScheduledFuture<?> timeoutCheckerFuture;

    public SessionManagerImpl(DefaultContext defaultContext) {
        this.defaultContext = defaultContext;
        Configuration conf = defaultContext.getFlinkConfig();
        this.idleTimeout = ((Duration)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT)).toMillis();
        this.checkInterval = ((Duration)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL)).toMillis();
        this.maxSessionCount = (Integer)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM);
        this.sessions = new ConcurrentHashMap<SessionHandle, Session>();
    }

    @Override
    public void start() {
        if (this.checkInterval > 0L && this.idleTimeout > 0L) {
            this.cleanupService = Executors.newSingleThreadScheduledExecutor();
            this.timeoutCheckerFuture = this.cleanupService.scheduleAtFixedRate(() -> {
                LOG.debug("Start to cleanup expired sessions, current session count: {}", (Object)this.sessions.size());
                for (Map.Entry<SessionHandle, Session> entry : this.sessions.entrySet()) {
                    SessionHandle sessionId = entry.getKey();
                    Session session = entry.getValue();
                    if (!this.isSessionExpired(session)) continue;
                    LOG.info("Session {} is expired, closing it...", (Object)sessionId);
                    this.closeSession(session);
                }
                LOG.debug("Removing expired session finished, current session count: {}", (Object)this.sessions.size());
            }, this.checkInterval, this.checkInterval, TimeUnit.MILLISECONDS);
        }
        Configuration conf = this.defaultContext.getFlinkConfig();
        this.operationExecutorService = ThreadUtils.newThreadPool((Integer)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN), (Integer)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX), ((Duration)conf.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME)).toMillis(), OPERATION_POOL_NAME);
    }

    @Override
    public void stop() {
        if (this.cleanupService != null) {
            this.timeoutCheckerFuture.cancel(true);
            this.cleanupService.shutdown();
        }
        if (this.operationExecutorService != null) {
            this.operationExecutorService.shutdown();
        }
        LOG.info("SessionManager is stopped.");
    }

    @Override
    public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException {
        Session session = this.sessions.get(sessionHandle);
        if (session == null) {
            String msg = String.format("Session '%s' does not exist.", sessionHandle);
            LOG.warn(msg);
            throw new SqlGatewayException(msg);
        }
        session.touch();
        return session;
    }

    @Override
    public synchronized Session openSession(SessionEnvironment environment) throws SqlGatewayException {
        this.checkSessionCount();
        Session session = null;
        SessionHandle sessionId = null;
        while (this.sessions.containsKey(sessionId = SessionHandle.create())) {
        }
        SessionContext sessionContext = SessionContext.create(this.defaultContext, sessionId, environment, this.operationExecutorService);
        session = new Session(sessionContext);
        session.open();
        this.sessions.put(sessionId, session);
        LOG.info("Session {} is opened, and the number of current sessions is {}.", (Object)session.getSessionHandle(), (Object)this.sessions.size());
        return session;
    }

    @Override
    public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException {
        Session session = this.getSession(sessionHandle);
        this.closeSession(session);
    }

    private void checkSessionCount() throws SqlGatewayException {
        if (this.maxSessionCount <= 0) {
            return;
        }
        if (this.sessions.size() >= this.maxSessionCount) {
            String msg = String.format("Failed to create session, the count of active sessions exceeds the max count: %s", this.maxSessionCount);
            LOG.warn(msg);
            throw new SqlGatewayException(msg);
        }
    }

    private boolean isSessionExpired(Session session) {
        if (this.idleTimeout > 0L) {
            return System.currentTimeMillis() - session.getLastAccessTime() > this.idleTimeout;
        }
        return false;
    }

    private void closeSession(Session session) {
        SessionHandle sessionId = session.getSessionHandle();
        this.sessions.remove(sessionId);
        session.close();
        LOG.info("Session: {} is closed.", (Object)sessionId);
    }

    @VisibleForTesting
    public boolean isSessionAlive(SessionHandle sessionId) {
        return this.sessions.containsKey(sessionId);
    }

    @VisibleForTesting
    public int currentSessionCount() {
        return this.sessions.size();
    }

    @VisibleForTesting
    public int getOperationCount(SessionHandle sessionHandle) {
        return this.getSession(sessionHandle).getOperationManager().getOperationCount();
    }
}

