/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard.enrichment;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.calcite.RecordPathFunctions;
import org.apache.nifi.processors.standard.enrichment.RecordJoinInput;
import org.apache.nifi.processors.standard.enrichment.SqlJoinCalciteParameters;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;

public class SqlJoinCache
implements AutoCloseable {
    private final ComponentLog logger;
    private final Cache<Tuple<String, RecordSchema>, BlockingQueue<SqlJoinCalciteParameters>> calciteParameterQueues = Caffeine.newBuilder().maximumSize(25L).removalListener(this::onCacheEviction).build();

    public SqlJoinCache(ComponentLog logger) {
        this.logger = logger;
    }

    public SqlJoinCalciteParameters getCalciteParameters(String sql, ProcessSession session, RecordSchema schema, RecordJoinInput originalInput, RecordJoinInput enrichmentInput) throws SQLException {
        Tuple tuple = new Tuple((Object)sql, (Object)schema);
        BlockingQueue queue = (BlockingQueue)this.calciteParameterQueues.get((Object)tuple, key -> new LinkedBlockingQueue());
        SqlJoinCalciteParameters cachedStmt = (SqlJoinCalciteParameters)queue.poll();
        if (cachedStmt != null) {
            return cachedStmt;
        }
        return this.createCalciteParameters(sql, session, originalInput, enrichmentInput, queue);
    }

    public void returnCalciteParameters(String sql, RecordSchema schema, SqlJoinCalciteParameters parameters) {
        Tuple tuple = new Tuple((Object)sql, (Object)schema);
        BlockingQueue queue = (BlockingQueue)this.calciteParameterQueues.getIfPresent((Object)tuple);
        if (queue == null || !queue.offer(parameters)) {
            parameters.close();
        }
    }

    private SqlJoinCalciteParameters createCalciteParameters(String sql, ProcessSession session, RecordJoinInput originalInput, RecordJoinInput enrichmentInput, BlockingQueue<SqlJoinCalciteParameters> parameterQueue) throws SQLException {
        CalciteConnection connection = this.createCalciteConnection();
        SchemaPlus rootSchema = RecordPathFunctions.createRootSchema(connection);
        FlowFileTable originalTable = new FlowFileTable(session, originalInput.getFlowFile(), originalInput.getRecordSchema(), originalInput.getRecordReaderFactory(), this.logger);
        rootSchema.add("ORIGINAL", (Table)originalTable);
        FlowFileTable enrichmentTable = new FlowFileTable(session, enrichmentInput.getFlowFile(), enrichmentInput.getRecordSchema(), enrichmentInput.getRecordReaderFactory(), this.logger);
        rootSchema.add("ENRICHMENT", (Table)enrichmentTable);
        rootSchema.setCacheEnabled(false);
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        return new SqlJoinCalciteParameters(sql, connection, preparedStatement, originalTable, enrichmentTable);
    }

    private CalciteConnection createCalciteConnection() {
        Properties properties = new Properties();
        properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
        properties.put(CalciteConnectionProperty.TIME_ZONE, "UTC");
        try {
            Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
            CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
            return calciteConnection;
        }
        catch (Exception e) {
            throw new ProcessException((Throwable)e);
        }
    }

    private void onCacheEviction(Tuple<String, RecordSchema> key, BlockingQueue<SqlJoinCalciteParameters> queue, RemovalCause cause) {
        this.clearQueue(queue);
    }

    private void clearQueue(BlockingQueue<SqlJoinCalciteParameters> parameterQueue) {
        SqlJoinCalciteParameters parameters;
        while ((parameters = (SqlJoinCalciteParameters)parameterQueue.poll()) != null) {
            parameters.close();
        }
    }

    @Override
    public void close() throws Exception {
        for (BlockingQueue statementQueue : this.calciteParameterQueues.asMap().values()) {
            this.clearQueue(statementQueue);
        }
        this.calciteParameterQueues.invalidateAll();
    }
}

