/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.jms.processors;

import jakarta.jms.ConnectionFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.jms.cf.IJMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryHandler;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryHandler;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.jms.processors.JMSWorker;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;

public abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcessor {
    static final String QUEUE = "QUEUE";
    static final String TOPIC = "TOPIC";
    static final String TEXT_MESSAGE = "text";
    static final String BYTES_MESSAGE = "bytes";
    static final PropertyDescriptor USER = new PropertyDescriptor.Builder().name("User Name").description("User Name used for authentication and authorization.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("Password").description("Password used for authentication and authorization.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder().name("Destination Name").description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder().name("Destination Type").description("The type of the JMS Destination. Could be one of 'QUEUE' or 'TOPIC'. Usually provided by the administrator. Defaults to 'QUEUE'").required(true).allowableValues(new String[]{"QUEUE", "TOPIC"}).defaultValue("QUEUE").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder().name("Connection Client ID").description("The client id to be set on the connection, if set. For durable non shared consumer this is mandatory, for all others it is optional, typically with shared consumers it is undesirable to be set. Please see JMS spec for further details").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder().name("Session Cache size").displayName("Session Cache Size").description("This property is deprecated and no longer has any effect on the Processor. It will be removed in a later version.").required(false).defaultValue("1").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("character-set").displayName("Character Set").description("The name of the character set to use to construct or interpret TextMessages").required(true).addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue(Charset.defaultCharset().name()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder().name("Connection Factory Service").description("The Controller Service that is used to obtain Connection Factory. Alternatively, the 'JNDI *' or the 'JMS *' properties can also be be used to configure the Connection Factory.").required(false).identifiesControllerService(JMSConnectionFactoryProviderDefinition.class).build();
    static final List<PropertyDescriptor> JNDI_JMS_CF_PROPERTIES = Collections.unmodifiableList(JndiJmsConnectionFactoryProperties.getPropertyDescriptors().stream().map(pd -> new PropertyDescriptor.Builder().fromPropertyDescriptor(pd).required(false).build()).collect(Collectors.toList()));
    static final List<PropertyDescriptor> JMS_CF_PROPERTIES = Collections.unmodifiableList(JMSConnectionFactoryProperties.getPropertyDescriptors().stream().map(pd -> new PropertyDescriptor.Builder().fromPropertyDescriptor(pd).required(false).build()).collect(Collectors.toList()));
    static final PropertyDescriptor BASE_RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").identifiesControllerService(RecordReaderFactory.class).required(false).build();
    static final PropertyDescriptor BASE_RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").identifiesControllerService(RecordSetWriterFactory.class).dependsOn(BASE_RECORD_READER, new AllowableValue[0]).required(true).build();
    private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
    private volatile BlockingQueue<T> workerPool;
    private final AtomicInteger clientIdCounter = new AtomicInteger(1);

    protected static String getClientId(ProcessContext context) {
        return context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().description("Additional configuration property for the Connection Factory").name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).dynamic(true).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return new ConnectionFactoryConfigValidator(validationContext).validateConnectionFactoryConfig();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        JMSWorker worker = (JMSWorker)this.workerPool.poll();
        if (worker == null) {
            try {
                worker = this.buildTargetResource(context);
            }
            catch (Exception e) {
                this.getLogger().error("Failed to initialize JMS Connection Factory", (Throwable)e);
                context.yield();
                throw e;
            }
        }
        try {
            this.rendezvousWithJms(context, session, worker);
        }
        finally {
            if (worker == null || !worker.isValid()) {
                this.getLogger().debug("Worker is invalid. Will try re-create... ");
                try {
                    if (worker != null) {
                        worker.shutdown();
                    }
                    CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
                    this.connectionFactoryProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
                    worker = this.buildTargetResource(context);
                }
                catch (Exception e) {
                    this.getLogger().error("Failed to rebuild:  " + this.connectionFactoryProvider);
                    worker = null;
                }
            }
            if (worker != null) {
                worker.jmsTemplate.setExplicitQosEnabled(false);
                worker.jmsTemplate.setDeliveryMode(2);
                worker.jmsTemplate.setTimeToLive(0L);
                worker.jmsTemplate.setPriority(4);
                this.workerPool.offer(worker);
            }
        }
    }

    @OnScheduled
    public void setupConnectionFactoryProvider(ProcessContext context) {
        if (context.getProperty(CF_SERVICE).isSet()) {
            this.connectionFactoryProvider = (IJMSConnectionFactoryProvider)context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
        } else if (context.getProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME).isSet()) {
            this.connectionFactoryProvider = new JndiJmsConnectionFactoryHandler(context, this.getLogger());
        } else if (context.getProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL).isSet()) {
            this.connectionFactoryProvider = new JMSConnectionFactoryHandler(context, this.getLogger());
        } else {
            throw new ProcessException("No Connection Factory configured.");
        }
    }

    @OnUnscheduled
    public void shutdownConnectionFactoryProvider(ProcessContext context) {
        this.connectionFactoryProvider = null;
    }

    @OnScheduled
    public void setupWorkerPool(ProcessContext context) {
        this.workerPool = new LinkedBlockingQueue<T>(context.getMaxConcurrentTasks());
    }

    @OnStopped
    public void close() {
        JMSWorker worker;
        while ((worker = (JMSWorker)this.workerPool.poll()) != null) {
            worker.shutdown();
        }
    }

    protected abstract void rendezvousWithJms(ProcessContext var1, ProcessSession var2, T var3) throws ProcessException;

    protected abstract T finishBuildingJmsWorker(CachingConnectionFactory var1, JmsTemplate var2, ProcessContext var3);

    private T buildTargetResource(ProcessContext context) {
        ConnectionFactory connectionFactory = this.connectionFactoryProvider.getConnectionFactory();
        UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter();
        cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
        cfCredentialsAdapter.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
        cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
        CachingConnectionFactory cachingFactory = new CachingConnectionFactory((ConnectionFactory)cfCredentialsAdapter);
        this.setClientId(context, (SingleConnectionFactory)cachingFactory);
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory((ConnectionFactory)cachingFactory);
        jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
        return this.finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
    }

    protected void setClientId(ProcessContext context, SingleConnectionFactory connectionFactory) {
        Object clientId = AbstractJMSProcessor.getClientId(context);
        if (clientId != null) {
            clientId = (String)clientId + "-" + this.clientIdCounter.getAndIncrement();
            connectionFactory.setClientId((String)clientId);
        }
    }

    static class ConnectionFactoryConfigValidator {
        private final ValidationContext validationContext;
        private final PropertyValue connectionFactoryServiceProperty;
        private final PropertyValue jndiInitialContextFactoryProperty;
        private final PropertyValue jmsConnectionFactoryImplProperty;

        ConnectionFactoryConfigValidator(ValidationContext validationContext) {
            this.validationContext = validationContext;
            this.connectionFactoryServiceProperty = validationContext.getProperty(CF_SERVICE);
            this.jndiInitialContextFactoryProperty = validationContext.getProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY);
            this.jmsConnectionFactoryImplProperty = validationContext.getProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL);
        }

        List<ValidationResult> validateConnectionFactoryConfig() {
            ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
            if (!(this.connectionFactoryServiceProperty.isSet() || this.jndiInitialContextFactoryProperty.isSet() || this.jmsConnectionFactoryImplProperty.isSet())) {
                results.add(new ValidationResult.Builder().subject("Connection Factory config").valid(false).explanation(String.format("either '%s', '%s' or '%s' must be specified.", CF_SERVICE.getDisplayName(), JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY.getDisplayName(), JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL.getDisplayName())).build());
            } else if (this.connectionFactoryServiceProperty.isSet()) {
                if (this.hasLocalJndiJmsConnectionFactoryConfig()) {
                    results.add(new ValidationResult.Builder().subject("Connection Factory config").valid(false).explanation(String.format("cannot set both '%s' and 'JNDI *' properties.", CF_SERVICE.getDisplayName())).build());
                }
                if (this.hasLocalJMSConnectionFactoryConfig()) {
                    results.add(new ValidationResult.Builder().subject("Connection Factory config").valid(false).explanation(String.format("cannot set both '%s' and 'JMS *' properties.", CF_SERVICE.getDisplayName())).build());
                }
            } else if (this.hasLocalJndiJmsConnectionFactoryConfig() && this.hasLocalJMSConnectionFactoryConfig()) {
                results.add(new ValidationResult.Builder().subject("Connection Factory config").valid(false).explanation("cannot set both 'JNDI *' and 'JMS *' properties.").build());
            } else if (this.jndiInitialContextFactoryProperty.isSet()) {
                this.validateLocalConnectionFactoryConfig(JndiJmsConnectionFactoryProperties.getPropertyDescriptors(), JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, results);
            } else if (this.jmsConnectionFactoryImplProperty.isSet()) {
                this.validateLocalConnectionFactoryConfig(JMSConnectionFactoryProperties.getPropertyDescriptors(), JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, results);
            }
            return results;
        }

        private boolean hasLocalJndiJmsConnectionFactoryConfig() {
            return this.hasLocalConnectionFactoryConfig(JndiJmsConnectionFactoryProperties.getPropertyDescriptors());
        }

        private boolean hasLocalJMSConnectionFactoryConfig() {
            return this.hasLocalConnectionFactoryConfig(JMSConnectionFactoryProperties.getPropertyDescriptors());
        }

        private boolean hasLocalConnectionFactoryConfig(List<PropertyDescriptor> localConnectionFactoryProperties) {
            for (PropertyDescriptor propertyDescriptor : localConnectionFactoryProperties) {
                PropertyValue propertyValue = this.validationContext.getProperty(propertyDescriptor);
                if (!propertyValue.isSet()) continue;
                return true;
            }
            return false;
        }

        private void validateLocalConnectionFactoryConfig(List<PropertyDescriptor> localConnectionFactoryProperties, PropertyDescriptor indicatorProperty, List<ValidationResult> results) {
            for (PropertyDescriptor propertyDescriptor : localConnectionFactoryProperties) {
                PropertyValue propertyValue;
                if (!propertyDescriptor.isRequired() || (propertyValue = this.validationContext.getProperty(propertyDescriptor)).isSet()) continue;
                results.add(new ValidationResult.Builder().subject("Connection Factory config").valid(false).explanation(String.format("'%s' must be specified when '%s' has been configured.", propertyDescriptor.getDisplayName(), indicatorProperty.getDisplayName())).build());
            }
        }
    }
}

