/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.livy;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.api.livy.LivySessionService;
import org.apache.nifi.controller.api.livy.exception.SessionManagerException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosKeytabCredentials;
import org.apache.nifi.hadoop.KerberosKeytabSPNegoAuthSchemeProvider;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

@DeprecationNotice(reason="Unmaintained and planned for removal in version 2.0")
@Tags(value={"Livy", "REST", "Spark", "http"})
@CapabilityDescription(value="Manages pool of Spark sessions over HTTP")
public class LivySessionController
extends AbstractControllerService
implements LivySessionService {
    public static final PropertyDescriptor LIVY_HOST = new PropertyDescriptor.Builder().name("livy-cs-livy-host").displayName("Livy Host").description("The hostname (or IP address) of the Livy server.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor LIVY_PORT = new PropertyDescriptor.Builder().name("livy-cs-livy-port").displayName("Livy Port").description("The port number for the Livy server.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("8998").build();
    public static final PropertyDescriptor SESSION_POOL_SIZE = new PropertyDescriptor.Builder().name("livy-cs-session-pool-size").displayName("Session Pool Size").description("Number of sessions to keep open").required(true).defaultValue("2").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor SESSION_TYPE = new PropertyDescriptor.Builder().name("livy-cs-session-kind").displayName("Session Type").description("The type of Spark session to start (spark, pyspark, pyspark3, sparkr, e.g.)").required(true).allowableValues(new String[]{"spark", "pyspark", "pyspark3", "sparkr"}).defaultValue("spark").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SESSION_MGR_STATUS_INTERVAL = new PropertyDescriptor.Builder().name("livy-cs-session-manager-status-interval").displayName("Session Manager Status Interval").description("The amount of time to wait between requesting session information updates.").required(true).defaultValue("2 sec").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor JARS = new PropertyDescriptor.Builder().name("livy-cs-session-jars").displayName("Session JARs").description("JARs to be used in the Spark session.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor FILES = new PropertyDescriptor.Builder().name("livy-cs-session-files").displayName("Session Files").description("Files to be used in the Spark session.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue(null).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("Connection Timeout").description("Max wait time for connection to remote service.").required(true).defaultValue("5 secs").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
    private volatile String livyUrl;
    private volatile int sessionPoolSize;
    private volatile String controllerKind;
    private volatile String jars;
    private volatile String files;
    private final Map<Integer, JSONObject> sessions = new ConcurrentHashMap<Integer, JSONObject>();
    private volatile SSLContextService sslContextService;
    private volatile SSLContext sslContext;
    private volatile int connectTimeout;
    private volatile Thread livySessionManagerThread = null;
    private volatile boolean enabled = true;
    private volatile KerberosCredentialsService credentialsService;
    private volatile SessionManagerException sessionManagerException;
    private List<PropertyDescriptor> properties;

    protected void init(ControllerServiceInitializationContext config) {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(LIVY_HOST);
        props.add(LIVY_PORT);
        props.add(SESSION_POOL_SIZE);
        props.add(SESSION_TYPE);
        props.add(SESSION_MGR_STATUS_INTERVAL);
        props.add(SSL_CONTEXT_SERVICE);
        props.add(CONNECT_TIMEOUT);
        props.add(JARS);
        props.add(FILES);
        props.add(KERBEROS_CREDENTIALS_SERVICE);
        this.properties = Collections.unmodifiableList(props);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) {
        String livyHost = context.getProperty(LIVY_HOST).evaluateAttributeExpressions().getValue();
        String livyPort = context.getProperty(LIVY_PORT).evaluateAttributeExpressions().getValue();
        String sessionPoolSize = context.getProperty(SESSION_POOL_SIZE).evaluateAttributeExpressions().getValue();
        String sessionKind = context.getProperty(SESSION_TYPE).getValue();
        long sessionManagerStatusInterval = context.getProperty(SESSION_MGR_STATUS_INTERVAL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        String jars = context.getProperty(JARS).evaluateAttributeExpressions().getValue();
        String files = context.getProperty(FILES).evaluateAttributeExpressions().getValue();
        this.sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        this.sslContext = this.sslContextService == null ? null : this.sslContextService.createContext();
        this.connectTimeout = Math.toIntExact(context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS));
        this.credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        this.livyUrl = "http" + (this.sslContextService != null ? "s" : "") + "://" + livyHost + ":" + livyPort;
        this.controllerKind = sessionKind;
        this.jars = jars;
        this.files = files;
        this.sessionPoolSize = Integer.valueOf(sessionPoolSize);
        this.enabled = true;
        this.livySessionManagerThread = new Thread(() -> {
            while (this.enabled) {
                try {
                    this.manageSessions();
                    this.sessionManagerException = null;
                }
                catch (Exception e) {
                    this.getLogger().error("Livy Session Manager Thread run into an error, but continues to run", (Throwable)e);
                    this.sessionManagerException = new SessionManagerException((Throwable)e);
                }
                try {
                    Thread.sleep(sessionManagerStatusInterval);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.enabled = false;
                }
            }
        });
        this.livySessionManagerThread.setName("Livy-Session-Manager-" + this.controllerKind);
        this.livySessionManagerThread.start();
    }

    @OnDisabled
    public void shutdown() {
        ComponentLog log = this.getLogger();
        try {
            this.enabled = false;
            this.livySessionManagerThread.interrupt();
            this.livySessionManagerThread.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Livy Session Manager Thread interrupted");
        }
    }

    public Map<String, String> getSession() throws SessionManagerException {
        this.checkSessionManagerException();
        HashMap<String, String> sessionMap = new HashMap<String, String>();
        try {
            Map<Integer, JSONObject> sessionsCopy = this.sessions;
            for (int sessionId : sessionsCopy.keySet()) {
                JSONObject currentSession = this.sessions.get(sessionId);
                String state = currentSession.getString("state");
                String sessionKind = currentSession.getString("kind");
                if (!state.equalsIgnoreCase("idle") || !sessionKind.equalsIgnoreCase(this.controllerKind)) continue;
                sessionMap.put("sessionId", String.valueOf(sessionId));
                sessionMap.put("livyUrl", this.livyUrl);
                break;
            }
        }
        catch (JSONException e) {
            this.getLogger().error("Unexpected data found when looking for JSON object with 'state' and 'kind' fields", (Throwable)e);
        }
        return sessionMap;
    }

    public HttpClient getConnection() throws IOException, SessionManagerException {
        this.checkSessionManagerException();
        return this.openConnection();
    }

    private HttpClient openConnection() throws IOException {
        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
        if (this.sslContextService != null) {
            try {
                SSLContext sslContext = this.getSslSocketFactory(this.sslContextService);
                httpClientBuilder.setSSLContext(sslContext);
            }
            catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e) {
                throw new IOException(e);
            }
        }
        if (this.credentialsService != null) {
            BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(new AuthScope(null, -1, null), (Credentials)new KerberosKeytabCredentials(this.credentialsService.getPrincipal(), this.credentialsService.getKeytab()));
            httpClientBuilder.setDefaultCredentialsProvider((CredentialsProvider)credentialsProvider);
            Registry authSchemeRegistry = RegistryBuilder.create().register("Negotiate", (Object)new KerberosKeytabSPNegoAuthSchemeProvider()).build();
            httpClientBuilder.setDefaultAuthSchemeRegistry((Lookup)authSchemeRegistry);
        }
        RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
        requestConfigBuilder.setConnectTimeout(this.connectTimeout);
        requestConfigBuilder.setConnectionRequestTimeout(this.connectTimeout);
        requestConfigBuilder.setSocketTimeout(this.connectTimeout);
        httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
        return httpClientBuilder.build();
    }

    private void manageSessions() throws InterruptedException, IOException {
        int idleSessions = 0;
        ComponentLog log = this.getLogger();
        try {
            Map<Integer, JSONObject> sessionsInfo = this.listSessions();
            if (this.sessions.isEmpty()) {
                log.debug("manageSessions() the active session list is empty, populating from acquired list...");
                this.sessions.putAll(sessionsInfo);
            }
            for (Integer sessionId : new ArrayList<Integer>(this.sessions.keySet())) {
                JSONObject currentSession = this.sessions.get(sessionId);
                log.debug("manageSessions() Updating current session: " + currentSession);
                if (sessionsInfo.containsKey(sessionId)) {
                    String state = currentSession.getString("state");
                    String sessionKind = currentSession.getString("kind");
                    log.debug("manageSessions() controller kind: {}, session kind: {}, session state: {}", new Object[]{this.controllerKind, sessionKind, state});
                    if (state.equalsIgnoreCase("idle") && sessionKind.equalsIgnoreCase(this.controllerKind)) {
                        ++idleSessions;
                        this.sessions.put(sessionId, sessionsInfo.get(sessionId));
                        sessionsInfo.remove(sessionId);
                        continue;
                    }
                    if ((state.equalsIgnoreCase("busy") || state.equalsIgnoreCase("starting")) && sessionKind.equalsIgnoreCase(this.controllerKind)) {
                        this.sessions.put(sessionId, sessionsInfo.get(sessionId));
                        sessionsInfo.remove(sessionId);
                        continue;
                    }
                    this.sessions.remove(sessionId);
                    sessionsInfo.remove(sessionId);
                    continue;
                }
                log.debug("manageSessions() session exists in session pool but not in source snapshot, removing from pool...");
                this.sessions.remove(sessionId);
                sessionsInfo.remove(sessionId);
            }
            int numSessions = this.sessions.size();
            log.debug("manageSessions() There are " + numSessions + " sessions in the pool");
            if (numSessions == 0) {
                for (int i = 0; i < this.sessionPoolSize; ++i) {
                    JSONObject newSessionInfo = this.openSession();
                    this.sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
                    log.debug("manageSessions() Registered new session: " + newSessionInfo);
                }
            } else {
                JSONObject newSessionInfo;
                if (idleSessions == 0) {
                    log.debug("manageSessions() There are " + numSessions + " sessions in the pool but none of them are idle sessions, creating...");
                    newSessionInfo = this.openSession();
                    this.sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
                    log.debug("manageSessions() Registered new session: " + newSessionInfo);
                }
                if (numSessions < this.sessionPoolSize) {
                    log.debug("manageSessions() There are " + numSessions + ", need more sessions to equal requested pool size of " + this.sessionPoolSize + ", creating...");
                    for (int i = 0; i < this.sessionPoolSize - numSessions; ++i) {
                        newSessionInfo = this.openSession();
                        this.sessions.put(newSessionInfo.getInt("id"), newSessionInfo);
                        log.debug("manageSessions() Registered new session: " + newSessionInfo);
                    }
                }
            }
        }
        catch (ConnectException | SocketTimeoutException ce) {
            log.error("Timeout connecting to Livy service to retrieve sessions", (Throwable)ce);
        }
        catch (JSONException e) {
            throw new IOException(e);
        }
    }

    private Map<Integer, JSONObject> listSessions() throws IOException {
        String sessionsUrl = this.livyUrl + "/sessions";
        HashMap<Integer, JSONObject> sessionsMap = new HashMap<Integer, JSONObject>();
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Requested-By", "nifi");
        try {
            JSONObject sessionsInfo = this.readJSONFromUrl(sessionsUrl, headers);
            int numSessions = sessionsInfo.getJSONArray("sessions").length();
            for (int i = 0; i < numSessions; ++i) {
                int currentSessionId = sessionsInfo.getJSONArray("sessions").getJSONObject(i).getInt("id");
                JSONObject currentSession = sessionsInfo.getJSONArray("sessions").getJSONObject(i);
                sessionsMap.put(currentSessionId, currentSession);
            }
        }
        catch (JSONException e) {
            throw new IOException(e);
        }
        return sessionsMap;
    }

    private JSONObject getSessionInfo(int sessionId) throws IOException {
        JSONObject sessionInfo;
        String sessionUrl = this.livyUrl + "/sessions/" + sessionId;
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Requested-By", "nifi");
        try {
            sessionInfo = this.readJSONFromUrl(sessionUrl, headers);
        }
        catch (JSONException e) {
            throw new IOException(e);
        }
        return sessionInfo;
    }

    private JSONObject openSession() throws IOException, JSONException, InterruptedException {
        ComponentLog log = this.getLogger();
        ObjectMapper mapper = new ObjectMapper();
        String sessionsUrl = this.livyUrl + "/sessions";
        StringBuilder payload = new StringBuilder("{\"kind\":\"" + this.controllerKind + "\"");
        if (this.jars != null) {
            List jarsArray = Arrays.stream(this.jars.split(",")).filter(StringUtils::isNotBlank).map(String::trim).collect(Collectors.toList());
            String jarsJsonArray = mapper.writeValueAsString(jarsArray);
            payload.append(",\"jars\":");
            payload.append(jarsJsonArray);
        }
        if (this.files != null) {
            List filesArray = Arrays.stream(this.files.split(",")).filter(StringUtils::isNotBlank).map(String::trim).collect(Collectors.toList());
            String filesJsonArray = mapper.writeValueAsString(filesArray);
            payload.append(",\"files\":");
            payload.append(filesJsonArray);
        }
        payload.append("}");
        log.debug("openSession() Session Payload: " + payload.toString());
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Type", "application/json");
        headers.put("X-Requested-By", "nifi");
        JSONObject newSessionInfo = this.readJSONObjectFromUrlPOST(sessionsUrl, headers, payload.toString());
        Thread.sleep(1000L);
        while (newSessionInfo.getString("state").equalsIgnoreCase("starting")) {
            log.debug("openSession() Waiting for session to start...");
            newSessionInfo = this.getSessionInfo(newSessionInfo.getInt("id"));
            log.debug("openSession() newSessionInfo: " + newSessionInfo);
            Thread.sleep(1000L);
        }
        return newSessionInfo;
    }

    private JSONObject readJSONObjectFromUrlPOST(String urlString, Map<String, String> headers, String payload) throws IOException, JSONException {
        HttpClient httpClient = this.openConnection();
        HttpPost request = new HttpPost(urlString);
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            request.addHeader(entry.getKey(), entry.getValue());
        }
        StringEntity httpEntity = new StringEntity(payload);
        request.setEntity((HttpEntity)httpEntity);
        HttpResponse response = httpClient.execute((HttpUriRequest)request);
        if (response.getStatusLine().getStatusCode() != 200 && response.getStatusLine().getStatusCode() != 201) {
            throw new RuntimeException("Failed : HTTP error code : " + response.getStatusLine().getStatusCode() + " : " + response.getStatusLine().getReasonPhrase());
        }
        InputStream content = response.getEntity().getContent();
        return this.readAllIntoJSONObject(content);
    }

    private JSONObject readJSONFromUrl(String urlString, Map<String, String> headers) throws IOException, JSONException {
        HttpClient httpClient = this.openConnection();
        HttpGet request = new HttpGet(urlString);
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            request.addHeader(entry.getKey(), entry.getValue());
        }
        HttpResponse response = httpClient.execute((HttpUriRequest)request);
        InputStream content = response.getEntity().getContent();
        return this.readAllIntoJSONObject(content);
    }

    private JSONObject readAllIntoJSONObject(InputStream content) throws IOException, JSONException {
        BufferedReader rd = new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8));
        String jsonText = IOUtils.toString((Reader)rd);
        return new JSONObject(jsonText);
    }

    private SSLContext getSslSocketFactory(SSLContextService sslService) throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
        String keystoreLocation = sslService.getKeyStoreFile();
        String keystorePass = sslService.getKeyStorePassword();
        String keystoreType = sslService.getKeyStoreType();
        KeyStore keyStore = KeyStore.getInstance(keystoreType);
        try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation);){
            keyStore.load(keyStoreStream, keystorePass.toCharArray());
        }
        KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        keyManagerFactory.init(keyStore, keystorePass.toCharArray());
        String truststoreLocation = sslService.getTrustStoreFile();
        String truststorePass = sslService.getTrustStorePassword();
        String truststoreType = sslService.getTrustStoreType();
        KeyStore truststore = KeyStore.getInstance(truststoreType);
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
        truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
        trustManagerFactory.init(truststore);
        this.sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
        return this.sslContext;
    }

    private void checkSessionManagerException() throws SessionManagerException {
        SessionManagerException exception = this.sessionManagerException;
        if (exception != null) {
            throw this.sessionManagerException;
        }
    }
}

